diff --git a/docker-compose.regtest.yml b/docker-compose.regtest.yml index 562ee309f..1bfc818f5 100644 --- a/docker-compose.regtest.yml +++ b/docker-compose.regtest.yml @@ -20,7 +20,7 @@ services: container_name: nbxplorer ports: - 32838:32838 - image: nicolasdorier/nbxplorer:2.5.30 + image: nicolasdorier/nbxplorer:2.5.30-1 environment: - NBXPLORER_NETWORK=regtest - NBXPLORER_CHAINS=btc diff --git a/internal/core/application/indexer.go b/internal/core/application/indexer.go index a637cff16..f7e926e36 100644 --- a/internal/core/application/indexer.go +++ b/internal/core/application/indexer.go @@ -569,19 +569,20 @@ func (i *indexerService) walkVtxoChain( chain := make([]ChainTx, 0) nextVtxos := frontier visited := make(map[string]bool) + offchainTxCache := make(map[string]*domain.OffchainTx) allOutpoints := make([]Outpoint, 0) // Lazy cache for VTXOs loaded during this page. vtxoCache := make(map[string]domain.Vtxo) loadedMarkers := make(map[string]bool) - // Eagerly preload VTXOs by walking the marker DAG upward. + // Eagerly preload VTXOs and offchain txs by walking the marker DAG upward. if i.repoManager.Markers() != nil { startVtxos, err := i.repoManager.Vtxos().GetVtxos(ctx, nextVtxos) if err != nil { return nil, nil, "", err } - if err := i.preloadVtxosByMarkers(ctx, startVtxos, vtxoCache); err != nil { + if err := i.preloadByMarkers(ctx, startVtxos, vtxoCache, offchainTxCache); err != nil { return nil, nil, "", err } } @@ -601,6 +602,33 @@ func (i *indexerService) walkVtxoChain( return nil, nil, "", fmt.Errorf("vtxo not found for outpoint: %v", nextVtxos) } + missingOffchainTxids := make(map[string]struct{}) + for _, vtxo := range vtxos { + if !vtxo.Preconfirmed { + continue + } + if _, ok := offchainTxCache[vtxo.Txid]; ok { + continue + } + missingOffchainTxids[vtxo.Txid] = struct{}{} + } + + if len(missingOffchainTxids) > 0 { + txids := make([]string, 0, len(missingOffchainTxids)) + for txid := range missingOffchainTxids { + txids = append(txids, txid) + } + + offchainTxs, err := i.repoManager.OffchainTxs().GetOffchainTxsByTxids(ctx, txids) + if err != nil { + return nil, nil, "", fmt.Errorf("failed to retrieve offchain txs: %s", err) + } + + for _, tx := range offchainTxs { + offchainTxCache[tx.ArkTxid] = tx + } + } + newNextVtxos := make([]domain.Outpoint, 0) for _, vtxo := range vtxos { key := vtxo.Outpoint.String() @@ -630,9 +658,14 @@ func (i *indexerService) walkVtxoChain( // also, we have to populate the newNextVtxos with the checkpoints inputs // in order to continue the chain in the next iteration if vtxo.Preconfirmed { - offchainTx, err := i.repoManager.OffchainTxs().GetOffchainTx(ctx, vtxo.Txid) - if err != nil { - return nil, nil, "", fmt.Errorf("failed to retrieve offchain tx: %s", err) + offchainTx, ok := offchainTxCache[vtxo.Txid] + if !ok { + var err error + offchainTx, err = i.repoManager.OffchainTxs().GetOffchainTx(ctx, vtxo.Txid) + if err != nil { + return nil, nil, "", fmt.Errorf("failed to retrieve offchain tx: %s", err) + } + offchainTxCache[vtxo.Txid] = offchainTx } chainTx := ChainTx{ @@ -741,7 +774,6 @@ func (i *indexerService) walkVtxoChain( nextVtxos = newNextVtxos } - // Chain exhausted — no more pages. return chain, allOutpoints, "", nil } @@ -775,20 +807,22 @@ func decodeChainCursor(token string) ([]domain.Outpoint, error) { return outpoints, nil } -// preloadVtxosByMarkers bulk-fetches VTXOs by walking the marker DAG upward -// from the markers of startVtxos. This reduces DB round-trips from O(chain_length) -// to O(chain_length / MarkerInterval). -func (i *indexerService) preloadVtxosByMarkers( +// preloadByMarkers bulk-fetches VTXOs and their offchain txs by walking the +// marker DAG upward from the markers of startVtxos. This reduces DB round-trips +// from O(chain_length) to O(chain_length / MarkerInterval) for both layers. +func (i *indexerService) preloadByMarkers( ctx context.Context, startVtxos []domain.Vtxo, - cache map[string]domain.Vtxo, + vtxoCache map[string]domain.Vtxo, + offchainTxCache map[string]*domain.OffchainTx, ) error { markerRepo := i.repoManager.Markers() + offchainTxRepo := i.repoManager.OffchainTxs() // Seed cache and collect initial marker IDs. currentMarkerIDs := make(map[string]bool) for _, v := range startVtxos { - cache[v.Outpoint.String()] = v + vtxoCache[v.Outpoint.String()] = v for _, mid := range v.MarkerIDs { currentMarkerIDs[mid] = true } @@ -809,8 +843,38 @@ func (i *indexerService) preloadVtxosByMarkers( return err } for _, v := range vtxos { - if _, ok := cache[v.Outpoint.String()]; !ok { - cache[v.Outpoint.String()] = v + if _, ok := vtxoCache[v.Outpoint.String()]; !ok { + vtxoCache[v.Outpoint.String()] = v + } + } + + // Piggyback: bulk-fetch the offchain txs for the preconfirmed VTXOs + // in this window, so the walk loop never has to hit the DB per-hop. + missingTxids := make([]string, 0, len(vtxos)) + seen := make(map[string]bool, len(vtxos)) + for _, v := range vtxos { + if !v.Preconfirmed { + continue + } + if seen[v.Txid] { + continue + } + seen[v.Txid] = true + if _, ok := offchainTxCache[v.Txid]; ok { + continue + } + missingTxids = append(missingTxids, v.Txid) + } + // offchainTxRepo may be nil in test helpers that do not wire up the + // offchain-tx repo. Skip the piggyback in that case — the walk loop + // will fall back to its own in-loop bulk fetch for any cache misses. + if len(missingTxids) > 0 && offchainTxRepo != nil { + offchainTxs, err := offchainTxRepo.GetOffchainTxsByTxids(ctx, missingTxids) + if err != nil { + return err + } + for _, tx := range offchainTxs { + offchainTxCache[tx.ArkTxid] = tx } } diff --git a/internal/core/application/indexer_bench_test.go b/internal/core/application/indexer_bench_test.go index 312edf668..acb9ac8b3 100644 --- a/internal/core/application/indexer_bench_test.go +++ b/internal/core/application/indexer_bench_test.go @@ -3,13 +3,18 @@ package application import ( "context" "fmt" + "sort" "strings" + "sync" + "sync/atomic" "testing" + "time" "github.com/arkade-os/arkd/internal/core/domain" "github.com/btcsuite/btcd/btcutil/psbt" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/stretchr/testify/require" ) // Lightweight fake repos for benchmarks — no testify/mock overhead. @@ -92,12 +97,24 @@ func (r *benchOffchainTxRepo) GetOffchainTx( return &domain.OffchainTx{CheckpointTxs: map[string]string{}}, nil } +func (r *benchOffchainTxRepo) GetOffchainTxsByTxids( + _ context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + result := make([]*domain.OffchainTx, 0, len(txids)) + for _, txid := range txids { + if tx, ok := r.txs[txid]; ok { + result = append(result, tx) + } + } + return result, nil +} + func (r *benchOffchainTxRepo) Close() {} type benchRepoManager struct { vtxoRepo *benchVtxoRepo markerRepo *benchMarkerRepo - offchainRepo *benchOffchainTxRepo + offchainRepo domain.OffchainTxRepository } func (m *benchRepoManager) Events() domain.EventRepository { return nil } @@ -172,12 +189,16 @@ func buildLinearChain(n int, withMarkers bool) (*indexerService, domain.Outpoint if i < n-1 { offchainRepo.txs[tid] = &domain.OffchainTx{ + ArkTxid: tid, CheckpointTxs: map[string]string{ fmt.Sprintf("cp-%d", i): benchCheckpointPSBT(benchTxid(i+1), 0), }, } } else { - offchainRepo.txs[tid] = &domain.OffchainTx{CheckpointTxs: map[string]string{}} + offchainRepo.txs[tid] = &domain.OffchainTx{ + ArkTxid: tid, + CheckpointTxs: map[string]string{}, + } } } @@ -414,3 +435,441 @@ func BenchmarkCheckpointPSBTParse(b *testing.B) { } } } + +// countingOffchainTxRepo wraps benchOffchainTxRepo and counts calls. +type countingOffchainTxRepo struct { + inner *benchOffchainTxRepo + singleCalls atomic.Int64 + bulkCalls atomic.Int64 + latencyPerCall time.Duration +} + +func (r *countingOffchainTxRepo) GetOffchainTx( + ctx context.Context, txid string, +) (*domain.OffchainTx, error) { + r.singleCalls.Add(1) + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) + } + return r.inner.GetOffchainTx(ctx, txid) +} + +func (r *countingOffchainTxRepo) GetOffchainTxsByTxids( + ctx context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + r.bulkCalls.Add(1) + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) // one round-trip regardless of batch size + } + return r.inner.GetOffchainTxsByTxids(ctx, txids) +} + +func (r *countingOffchainTxRepo) AddOrUpdateOffchainTx( + _ context.Context, _ *domain.OffchainTx, +) error { + return nil +} + +func (r *countingOffchainTxRepo) Close() {} + +func (r *countingOffchainTxRepo) reset() { + r.singleCalls.Store(0) + r.bulkCalls.Store(0) +} + +// noBulkOffchainTxRepo is like benchOffchainTxRepo but GetOffchainTxsByTxids +// always returns empty, forcing the fallback to individual GetOffchainTx calls. +// This simulates the pre-optimization behavior. +type noBulkOffchainTxRepo struct { + countingOffchainTxRepo +} + +func (r *noBulkOffchainTxRepo) GetOffchainTxsByTxids( + _ context.Context, _ []string, +) ([]*domain.OffchainTx, error) { + r.bulkCalls.Add(1) + return []*domain.OffchainTx{}, nil +} + +// TestBulkOffchainTxReducesDBCalls verifies that the bulk prefetch reduces the +// number of DB round-trips. Uses a fanout tree where each iteration processes +// multiple VTXOs — bulk fetches all offchain txs in one call per iteration +// instead of one call per VTXO. +func TestBulkOffchainTxReducesDBCalls(t *testing.T) { + const depth = 8 // 2^9 - 1 = 511 VTXOs + ctx := context.Background() + + // Build fanout tree data (reuse the helper's repo setup). + n := (1 << (depth + 1)) - 1 + vtxoRepo := &benchVtxoRepo{vtxos: make(map[string]domain.Vtxo, n)} + innerRepo := &benchOffchainTxRepo{txs: make(map[string]*domain.OffchainTx, n)} + + for i := 0; i < n; i++ { + tid := benchTxid(i) + vtxoRepo.vtxos[fmt.Sprintf("%s:0", tid)] = domain.Vtxo{ + Outpoint: domain.Outpoint{Txid: tid, VOut: 0}, + Preconfirmed: true, + ExpiresAt: int64(1000 + i), + } + left := 2*i + 1 + right := 2*i + 2 + if left < n && right < n { + innerRepo.txs[tid] = &domain.OffchainTx{ + ArkTxid: tid, + CheckpointTxs: map[string]string{ + fmt.Sprintf("cp-l-%d", i): benchCheckpointPSBT(benchTxid(left), 0), + fmt.Sprintf("cp-r-%d", i): benchCheckpointPSBT(benchTxid(right), 0), + }, + } + } else { + innerRepo.txs[tid] = &domain.OffchainTx{ + ArkTxid: tid, + CheckpointTxs: map[string]string{}, + } + } + } + + start := Outpoint{Txid: benchTxid(0), VOut: 0} + + // With bulk fetch (current behavior). + bulkRepo := &countingOffchainTxRepo{inner: innerRepo} + svc := &indexerService{repoManager: &benchRepoManager{ + vtxoRepo: vtxoRepo, offchainRepo: bulkRepo, + }} + resp, err := svc.GetVtxoChain(ctx, "", start, nil, "") + require.NoError(t, err) + + bulkSingle := bulkRepo.singleCalls.Load() + bulkBulk := bulkRepo.bulkCalls.Load() + + // Without bulk fetch (simulated pre-optimization: bulk returns empty). + noBulkRepo := &noBulkOffchainTxRepo{countingOffchainTxRepo{inner: innerRepo}} + svc2 := &indexerService{repoManager: &benchRepoManager{ + vtxoRepo: vtxoRepo, offchainRepo: noBulkRepo, + }} + resp2, err := svc2.GetVtxoChain(ctx, "", start, nil, "") + require.NoError(t, err) + require.Equal(t, len(resp.Chain), len(resp2.Chain)) + + noBulkSingle := noBulkRepo.singleCalls.Load() + + t.Logf("fanout tree: depth=%d, %d VTXOs", depth, n) + t.Logf("WITH bulk: %d bulk calls, %d individual calls (total round-trips: %d)", + bulkBulk, bulkSingle, bulkBulk+bulkSingle) + t.Logf("WITHOUT bulk: %d individual calls (total round-trips: %d)", + noBulkSingle, noBulkSingle) + + // With bulk fetch, individual calls should be 0 (all served from cache). + require.Zero(t, bulkSingle, "bulk prefetch should eliminate individual GetOffchainTx calls") + // Bulk calls = depth+1 iterations (one per tree level), much fewer than N VTXOs. + require.LessOrEqual(t, bulkBulk, int64(depth+1), + "bulk calls should equal tree depth (one per iteration)") + // Without bulk, individual calls == N (one per preconfirmed VTXO). + require.Equal(t, int64(n), noBulkSingle, + "without bulk, every VTXO triggers an individual call") +} + +// BenchmarkOffchainTxBulkVsSingle compares chain traversal with and without +// the bulk offchain tx prefetch, using simulated DB latency to make the +// round-trip reduction visible in wall-clock time. Uses a fanout tree +// (depth 8, 511 VTXOs) where each iteration processes an exponentially +// growing number of VTXOs — the bulk path does 9 round-trips vs 511 +// individual calls without it. +func BenchmarkOffchainTxBulkVsSingle(b *testing.B) { + const depth = 8 + const simulatedLatency = 50 * time.Microsecond + + n := (1 << (depth + 1)) - 1 + vtxoRepo := &benchVtxoRepo{vtxos: make(map[string]domain.Vtxo, n)} + innerRepo := &benchOffchainTxRepo{txs: make(map[string]*domain.OffchainTx, n)} + + for i := 0; i < n; i++ { + tid := benchTxid(i) + vtxoRepo.vtxos[fmt.Sprintf("%s:0", tid)] = domain.Vtxo{ + Outpoint: domain.Outpoint{Txid: tid, VOut: 0}, + Preconfirmed: true, + ExpiresAt: int64(1000 + i), + } + left := 2*i + 1 + right := 2*i + 2 + if left < n && right < n { + innerRepo.txs[tid] = &domain.OffchainTx{ + ArkTxid: tid, + CheckpointTxs: map[string]string{ + fmt.Sprintf("cp-l-%d", i): benchCheckpointPSBT(benchTxid(left), 0), + fmt.Sprintf("cp-r-%d", i): benchCheckpointPSBT(benchTxid(right), 0), + }, + } + } else { + innerRepo.txs[tid] = &domain.OffchainTx{ + ArkTxid: tid, + CheckpointTxs: map[string]string{}, + } + } + } + + start := Outpoint{Txid: benchTxid(0), VOut: 0} + ctx := context.Background() + + b.Run(fmt.Sprintf("bulk_prefetch/%d_vtxos", n), func(b *testing.B) { + repo := &countingOffchainTxRepo{inner: innerRepo, latencyPerCall: simulatedLatency} + svc := &indexerService{repoManager: &benchRepoManager{ + vtxoRepo: vtxoRepo, offchainRepo: repo, + }} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + repo.reset() + _, err := svc.GetVtxoChain(ctx, "", start, nil, "") + if err != nil { + b.Fatal(err) + } + } + b.StopTimer() + b.ReportMetric(float64(repo.bulkCalls.Load())/float64(b.N), "bulk_calls/op") + b.ReportMetric(float64(repo.singleCalls.Load())/float64(b.N), "single_calls/op") + }) + + b.Run(fmt.Sprintf("no_bulk_fallback/%d_vtxos", n), func(b *testing.B) { + repo := &noBulkOffchainTxRepo{countingOffchainTxRepo{inner: innerRepo, latencyPerCall: simulatedLatency}} + svc := &indexerService{repoManager: &benchRepoManager{ + vtxoRepo: vtxoRepo, offchainRepo: repo, + }} + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + repo.reset() + _, err := svc.GetVtxoChain(ctx, "", start, nil, "") + if err != nil { + b.Fatal(err) + } + } + b.StopTimer() + b.ReportMetric(float64(repo.bulkCalls.Load())/float64(b.N), "bulk_calls/op") + b.ReportMetric(float64(repo.singleCalls.Load())/float64(b.N), "single_calls/op") + }) +} + +// phaseTimings accumulates per-phase wall-clock time and call counts across +// the wrapped repo methods. Safe for concurrent recording. +type phaseTimings struct { + mu sync.Mutex + totals map[string]time.Duration + counts map[string]int +} + +func newPhaseTimings() *phaseTimings { + return &phaseTimings{ + totals: make(map[string]time.Duration), + counts: make(map[string]int), + } +} + +func (p *phaseTimings) record(phase string, d time.Duration) { + p.mu.Lock() + p.totals[phase] += d + p.counts[phase]++ + p.mu.Unlock() +} + +func (p *phaseTimings) log(t *testing.T, header string, wall time.Duration) { + t.Helper() + p.mu.Lock() + defer p.mu.Unlock() + + phases := make([]string, 0, len(p.totals)) + var repoTotal time.Duration + for name, d := range p.totals { + phases = append(phases, name) + repoTotal += d + } + sort.Strings(phases) + + t.Logf("%s", header) + t.Logf(" %-32s %12s", "wall clock (GetVtxoChain)", wall) + for _, name := range phases { + t.Logf(" %-32s %12s (%d calls)", name, p.totals[name], p.counts[name]) + } + t.Logf(" %-32s %12s", "sum of repo phases", repoTotal) + t.Logf(" %-32s %12s", "other (psbt parse + overhead)", wall-repoTotal) +} + +// timingVtxoRepo wraps a VtxoRepository and records per-call latency into a +// shared phaseTimings. An optional per-call latency simulates DB round-trip +// cost so the relative phase times are visible when running against fakes. +type timingVtxoRepo struct { + domain.VtxoRepository + inner domain.VtxoRepository + t *phaseTimings + latencyPerCall time.Duration +} + +func (r *timingVtxoRepo) GetVtxos( + ctx context.Context, outpoints []domain.Outpoint, +) ([]domain.Vtxo, error) { + start := time.Now() + defer func() { r.t.record("Vtxos.GetVtxos", time.Since(start)) }() + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) + } + return r.inner.GetVtxos(ctx, outpoints) +} + +func (r *timingVtxoRepo) Close() {} + +type timingMarkerRepo struct { + domain.MarkerRepository + inner domain.MarkerRepository + t *phaseTimings + latencyPerCall time.Duration +} + +func (r *timingMarkerRepo) GetVtxoChainByMarkers( + ctx context.Context, markerIDs []string, +) ([]domain.Vtxo, error) { + start := time.Now() + defer func() { r.t.record("Markers.GetVtxoChainByMarkers", time.Since(start)) }() + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) + } + return r.inner.GetVtxoChainByMarkers(ctx, markerIDs) +} + +func (r *timingMarkerRepo) GetMarkersByIds( + ctx context.Context, ids []string, +) ([]domain.Marker, error) { + start := time.Now() + defer func() { r.t.record("Markers.GetMarkersByIds", time.Since(start)) }() + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) + } + return r.inner.GetMarkersByIds(ctx, ids) +} + +func (r *timingMarkerRepo) GetVtxosByMarker( + ctx context.Context, markerID string, +) ([]domain.Vtxo, error) { + start := time.Now() + defer func() { r.t.record("Markers.GetVtxosByMarker", time.Since(start)) }() + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) + } + return r.inner.GetVtxosByMarker(ctx, markerID) +} + +func (r *timingMarkerRepo) Close() {} + +type timingOffchainTxRepo struct { + domain.OffchainTxRepository + inner domain.OffchainTxRepository + t *phaseTimings + latencyPerCall time.Duration +} + +func (r *timingOffchainTxRepo) GetOffchainTx( + ctx context.Context, txid string, +) (*domain.OffchainTx, error) { + start := time.Now() + defer func() { r.t.record("OffchainTxs.GetOffchainTx", time.Since(start)) }() + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) + } + return r.inner.GetOffchainTx(ctx, txid) +} + +func (r *timingOffchainTxRepo) GetOffchainTxsByTxids( + ctx context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + start := time.Now() + defer func() { r.t.record("OffchainTxs.GetOffchainTxsByTxids", time.Since(start)) }() + if r.latencyPerCall > 0 { + time.Sleep(r.latencyPerCall) + } + return r.inner.GetOffchainTxsByTxids(ctx, txids) +} + +func (r *timingOffchainTxRepo) AddOrUpdateOffchainTx( + _ context.Context, _ *domain.OffchainTx, +) error { + return nil +} + +func (r *timingOffchainTxRepo) Close() {} + +// TestVtxoChainTimingBreakdown builds a deep linear chain and runs +// GetVtxoChain against it with timing-decorated repos, logging a per-phase +// wall-clock breakdown. This is the in-process replacement for the server-side +// timing log that previously lived in walkVtxoChain. +// +// The repos use an in-memory backing store and inject a fixed per-call +// simulatedLatency via time.Sleep, so the absolute numbers in the breakdown +// do NOT reflect real DB cost — they are only meaningful as relative phase +// proportions under a uniform latency assumption. +// +// Run with: +// +// go test -v -run TestVtxoChainTimingBreakdown ./internal/core/application/... +func TestVtxoChainTimingBreakdown(t *testing.T) { + const ( + chainLen = 10000 + simulatedLatency = 50 * time.Microsecond + ) + + ctx := context.Background() + + // Reuse buildLinearChain to get the same data layout the perf test produces, + // then swap its repo manager for a timing-decorated one. + svc, start := buildLinearChain(chainLen, true) + inner := svc.repoManager.(*benchRepoManager) + + timings := newPhaseTimings() + svc.repoManager = &wrappedRepoManager{ + vtxos: &timingVtxoRepo{ + inner: inner.vtxoRepo, t: timings, latencyPerCall: simulatedLatency, + }, + markers: &timingMarkerRepo{ + inner: inner.markerRepo, t: timings, latencyPerCall: simulatedLatency, + }, + offchainTxs: &timingOffchainTxRepo{ + inner: inner.offchainRepo, t: timings, latencyPerCall: simulatedLatency, + }, + } + + wallStart := time.Now() + resp, err := svc.GetVtxoChain(ctx, "", start, nil, "") + wall := time.Since(wallStart) + require.NoError(t, err) + require.Equal(t, 2*chainLen-1, len(resp.Chain)) + + timings.log(t, fmt.Sprintf( + "GetVtxoChain timing breakdown: linear chain n=%d, simulated repo latency=%s", + chainLen, simulatedLatency, + ), wall) +} + +// wrappedRepoManager is a minimal RepoManager that exposes only the repos +// walkVtxoChain touches. Unwired accessors panic with a descriptive message +// instead of returning nil, so an accidental dependency on one of them +// surfaces as a clear failure rather than a nil-pointer dereference. +type wrappedRepoManager struct { + vtxos domain.VtxoRepository + markers domain.MarkerRepository + offchainTxs domain.OffchainTxRepository +} + +func (m *wrappedRepoManager) Events() domain.EventRepository { panic("Events: not wired") } +func (m *wrappedRepoManager) Rounds() domain.RoundRepository { panic("Rounds: not wired") } +func (m *wrappedRepoManager) Vtxos() domain.VtxoRepository { return m.vtxos } +func (m *wrappedRepoManager) Markers() domain.MarkerRepository { + return m.markers +} +func (m *wrappedRepoManager) ScheduledSession() domain.ScheduledSessionRepo { + panic("ScheduledSession: not wired") +} +func (m *wrappedRepoManager) OffchainTxs() domain.OffchainTxRepository { return m.offchainTxs } +func (m *wrappedRepoManager) Convictions() domain.ConvictionRepository { + panic("Convictions: not wired") +} +func (m *wrappedRepoManager) Assets() domain.AssetRepository { panic("Assets: not wired") } +func (m *wrappedRepoManager) Fees() domain.FeeRepository { panic("Fees: not wired") } +func (m *wrappedRepoManager) Close() {} diff --git a/internal/core/application/indexer_exposure_test.go b/internal/core/application/indexer_exposure_test.go index 9ca69d6f6..0edf9cdaa 100644 --- a/internal/core/application/indexer_exposure_test.go +++ b/internal/core/application/indexer_exposure_test.go @@ -585,6 +585,71 @@ func TestGetVtxoChain(t *testing.T) { rounds.AssertExpectations(t) vtxos.AssertExpectations(t) }) + + t.Run("preconfirmed chain bulk-loads offchain txs", func(t *testing.T) { + vtxoOutpoint := Outpoint{Txid: testTxids[0], VOut: 0} + offchainTxid := vtxoOutpoint.Txid + checkpointB64 := buildCheckpointTxSpending(t, vtxoOutpoint.Txid, vtxoOutpoint.VOut) + + vtxos := &mockedVtxoRepo{} + vtxos.On("GetVtxos", mock.Anything, []domain.Outpoint{vtxoOutpoint}). + Return([]domain.Vtxo{{ + Outpoint: domain.Outpoint{Txid: vtxoOutpoint.Txid, VOut: vtxoOutpoint.VOut}, + Preconfirmed: true, + }}, nil) + + offchainRepo := &mockedOffchainTxRepo{} + offchainRepo.On("GetOffchainTxsByTxids", mock.Anything, []string{offchainTxid}). + Return([]*domain.OffchainTx{{ + ArkTxid: offchainTxid, + CheckpointTxs: map[string]string{ + "cp": checkpointB64, + }, + }}, nil) + + indexer := newTestIndexer(t, privkey, exposurePrivate, nil, vtxos, nil, offchainRepo) + + chain, _, _, err := indexer.walkVtxoChain(t.Context(), []domain.Outpoint{vtxoOutpoint}, 1000) + require.NoError(t, err) + require.NotEmpty(t, chain) + + offchainRepo.AssertNotCalled(t, "GetOffchainTx", mock.Anything, offchainTxid) + offchainRepo.AssertExpectations(t) + vtxos.AssertExpectations(t) + }) + + t.Run("preconfirmed chain falls back to single fetch on cache miss", func(t *testing.T) { + vtxoOutpoint := Outpoint{Txid: testTxids[0], VOut: 0} + offchainTxid := vtxoOutpoint.Txid + checkpointB64 := buildCheckpointTxSpending(t, vtxoOutpoint.Txid, vtxoOutpoint.VOut) + + vtxos := &mockedVtxoRepo{} + vtxos.On("GetVtxos", mock.Anything, []domain.Outpoint{vtxoOutpoint}). + Return([]domain.Vtxo{{ + Outpoint: domain.Outpoint{Txid: vtxoOutpoint.Txid, VOut: vtxoOutpoint.VOut}, + Preconfirmed: true, + }}, nil) + + offchainRepo := &mockedOffchainTxRepo{} + offchainRepo.On("GetOffchainTxsByTxids", mock.Anything, []string{offchainTxid}). + Return([]*domain.OffchainTx{}, nil) + offchainRepo.On("GetOffchainTx", mock.Anything, offchainTxid). + Return(&domain.OffchainTx{ + ArkTxid: offchainTxid, + CheckpointTxs: map[string]string{ + "cp": checkpointB64, + }, + }, nil) + + indexer := newTestIndexer(t, privkey, exposurePrivate, nil, vtxos, nil, offchainRepo) + + chain, _, _, err := indexer.walkVtxoChain(t.Context(), []domain.Outpoint{vtxoOutpoint}, 1000) + require.NoError(t, err) + require.NotEmpty(t, chain) + + offchainRepo.AssertExpectations(t) + vtxos.AssertExpectations(t) + }) }) t.Run("invalid", func(t *testing.T) { @@ -927,6 +992,7 @@ func TestStripSignerSignatures(t *testing.T) { func newTestIndexer( t *testing.T, privkey *btcec.PrivateKey, exposure exposure, rounds *mockedRoundRepo, vtxos *mockedVtxoRepo, wallet *mockedWallet, + offchainRepos ...*mockedOffchainTxRepo, ) *indexerService { t.Helper() @@ -940,6 +1006,9 @@ func newTestIndexer( if vtxos != nil { repo.On("Vtxos").Return(vtxos) } + if len(offchainRepos) > 0 && offchainRepos[0] != nil { + repo.On("OffchainTxs").Return(offchainRepos[0]) + } cache := newTokenCache(defaultAuthTokenTTL) t.Cleanup(cache.close) @@ -995,6 +1064,25 @@ func buildTestTreeTxs(t *testing.T) (rootTxid, leafTxid string, flatTree arktree return } +func buildCheckpointTxSpending(t *testing.T, prevTxid string, prevVout uint32) string { + t.Helper() + + prevHash, err := chainhash.NewHashFromStr(prevTxid) + require.NoError(t, err) + + ptx, err := psbt.New( + []*wire.OutPoint{{Hash: *prevHash, Index: prevVout}}, + []*wire.TxOut{{Value: 1000, PkScript: []byte{txscript.OP_TRUE}}}, + 2, 0, []uint32{wire.MaxTxInSequenceNum}, + ) + require.NoError(t, err) + + b64, err := ptx.B64Encode() + require.NoError(t, err) + + return b64 +} + // buildTestIntent creates a valid signed intent proof that passes intent.Verify. // It builds a MultisigClosure with vtxoKey, derives the taproot output key from // that closure, signs input 1, and returns the intent plus the taproot key (so @@ -1152,6 +1240,36 @@ func (m *mockedRepoManager) Vtxos() domain.VtxoRepository { return nil } +func (m *mockedRepoManager) OffchainTxs() domain.OffchainTxRepository { + if v := m.Called().Get(0); v != nil { + return v.(domain.OffchainTxRepository) + } + return nil +} + +type mockedOffchainTxRepo struct { + mock.Mock + domain.OffchainTxRepository // unimplemented methods panic on call +} + +func (m *mockedOffchainTxRepo) GetOffchainTx(ctx context.Context, txid string) (*domain.OffchainTx, error) { + args := m.Called(ctx, txid) + if v := args.Get(0); v != nil { + return v.(*domain.OffchainTx), args.Error(1) + } + return nil, args.Error(1) +} + +func (m *mockedOffchainTxRepo) GetOffchainTxsByTxids( + ctx context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + args := m.Called(ctx, txids) + if v := args.Get(0); v != nil { + return v.([]*domain.OffchainTx), args.Error(1) + } + return nil, args.Error(1) +} + type mockedWallet struct { mock.Mock ports.WalletService // unimplemented methods panic on call diff --git a/internal/core/application/indexer_test.go b/internal/core/application/indexer_test.go index 3b3ed31b5..d5861ebc8 100644 --- a/internal/core/application/indexer_test.go +++ b/internal/core/application/indexer_test.go @@ -303,6 +303,16 @@ func (m *mockOffchainTxRepoForIndexer) GetOffchainTx( return args.Get(0).(*domain.OffchainTx), args.Error(1) } +func (m *mockOffchainTxRepoForIndexer) GetOffchainTxsByTxids( + ctx context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + args := m.Called(ctx, txids) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]*domain.OffchainTx), args.Error(1) +} + func (m *mockOffchainTxRepoForIndexer) AddOrUpdateOffchainTx( ctx context.Context, offchainTx *domain.OffchainTx, ) error { @@ -361,6 +371,10 @@ func newChainTestIndexerWithOffchain() ( vtxoRepo := &mockVtxoRepoForIndexer{} markerRepo := &mockMarkerRepoForIndexer{} offchainTxRepo := &mockOffchainTxRepoForIndexer{} + // Default: bulk fetch returns empty so the fallback to GetOffchainTx is used. + // Tests that want to verify bulk behavior can override with a more specific expectation. + offchainTxRepo.On("GetOffchainTxsByTxids", mock.Anything, mock.Anything). + Return([]*domain.OffchainTx{}, nil).Maybe() repoManager := &mockRepoManagerForIndexer{ vtxos: vtxoRepo, markers: markerRepo, offchainTxs: offchainTxRepo, } @@ -737,12 +751,23 @@ func setupPreconfirmedChain( cpA := makeCheckpointPSBT(t, txidB, 0) cpB := makeCheckpointPSBT(t, txidC, 0) + offchainTxA := &domain.OffchainTx{ArkTxid: txidA, CheckpointTxs: map[string]string{"cp-a": cpA}} + offchainTxB := &domain.OffchainTx{ArkTxid: txidB, CheckpointTxs: map[string]string{"cp-b": cpB}} + offchainTxC := &domain.OffchainTx{ArkTxid: txidC, CheckpointTxs: map[string]string{}} + + offchainTxRepo.On("GetOffchainTxsByTxids", ctx, []string{txidA}). + Return([]*domain.OffchainTx{offchainTxA}, nil).Maybe() + offchainTxRepo.On("GetOffchainTxsByTxids", ctx, []string{txidB}). + Return([]*domain.OffchainTx{offchainTxB}, nil).Maybe() + offchainTxRepo.On("GetOffchainTxsByTxids", ctx, []string{txidC}). + Return([]*domain.OffchainTx{offchainTxC}, nil).Maybe() + offchainTxRepo.On("GetOffchainTx", ctx, txidA). - Return(&domain.OffchainTx{CheckpointTxs: map[string]string{"cp-a": cpA}}, nil) + Return(offchainTxA, nil).Maybe() offchainTxRepo.On("GetOffchainTx", ctx, txidB). - Return(&domain.OffchainTx{CheckpointTxs: map[string]string{"cp-b": cpB}}, nil) + Return(offchainTxB, nil).Maybe() offchainTxRepo.On("GetOffchainTx", ctx, txidC). - Return(&domain.OffchainTx{CheckpointTxs: map[string]string{}}, nil) + Return(offchainTxC, nil).Maybe() return Outpoint{Txid: txidA, VOut: 0} } @@ -826,8 +851,11 @@ func TestGetVtxoChain_ShortChainNoToken(t *testing.T) { Return([]domain.Vtxo{vtxo}, nil) markerRepo.On("GetVtxosByMarker", ctx, mock.Anything). Return([]domain.Vtxo{}, nil).Maybe() + offchainTxA := &domain.OffchainTx{ArkTxid: txidA, CheckpointTxs: map[string]string{}} + offchainTxRepo.On("GetOffchainTxsByTxids", ctx, []string{txidA}). + Return([]*domain.OffchainTx{offchainTxA}, nil) offchainTxRepo.On("GetOffchainTx", ctx, txidA). - Return(&domain.OffchainTx{CheckpointTxs: map[string]string{}}, nil) + Return(offchainTxA, nil).Maybe() // Page size larger than chain page := &Page{PageSize: 100} @@ -888,7 +916,7 @@ func matchOutpoints(expected ...domain.Outpoint) interface{} { // matchIDs returns a mock.MatchedBy matcher that matches a []string argument // containing exactly the given IDs, regardless of order. This avoids flakes from -// non-deterministic map iteration in preloadVtxosByMarkers. +// non-deterministic map iteration in preloadByMarkers. func matchIDs(expected ...string) interface{} { sorted := make([]string, len(expected)) copy(sorted, expected) @@ -909,7 +937,7 @@ func matchIDs(expected ...string) interface{} { }) } -// TestPreloadVtxosByMarkers_WalksMarkerChain verifies that preloadVtxosByMarkers +// TestPreloadVtxosByMarkers_WalksMarkerChain verifies that preloadByMarkers // follows the marker DAG upward and populates the cache with all discovered VTXOs. func TestPreloadVtxosByMarkers_WalksMarkerChain(t *testing.T) { _, markerRepo, indexer := newChainTestIndexer() @@ -952,7 +980,8 @@ func TestPreloadVtxosByMarkers_WalksMarkerChain(t *testing.T) { }, nil) cache := make(map[string]domain.Vtxo) - err := indexer.preloadVtxosByMarkers(ctx, []domain.Vtxo{vtxoLeaf}, cache) + offchainCache := make(map[string]*domain.OffchainTx) + err := indexer.preloadByMarkers(ctx, []domain.Vtxo{vtxoLeaf}, cache, offchainCache) require.NoError(t, err) // Cache should contain the seed vtxo plus all vtxos from all marker levels. @@ -999,7 +1028,8 @@ func TestPreloadVtxosByMarkers_NoCycleLoop(t *testing.T) { }, nil) cache := make(map[string]domain.Vtxo) - err := indexer.preloadVtxosByMarkers(ctx, []domain.Vtxo{vtxo}, cache) + offchainCache := make(map[string]*domain.OffchainTx) + err := indexer.preloadByMarkers(ctx, []domain.Vtxo{vtxo}, cache, offchainCache) require.NoError(t, err) // Should terminate without looping forever. @@ -1013,7 +1043,7 @@ func TestPreloadVtxosByMarkers_NoCycleLoop(t *testing.T) { } // TestGetVtxoChain_WithMarkers_UsesPreload verifies that GetVtxoChain uses -// preloadVtxosByMarkers when VTXOs have markers, and that the main loop +// preloadByMarkers when VTXOs have markers, and that the main loop // hits the cache instead of making additional DB calls. func TestGetVtxoChain_WithMarkers_UsesPreload(t *testing.T) { vtxoRepo, markerRepo, offchainTxRepo, indexer := newChainTestIndexerWithOffchain() diff --git a/internal/core/domain/offchain_tx_repo.go b/internal/core/domain/offchain_tx_repo.go index 87615d7a1..094bad62c 100644 --- a/internal/core/domain/offchain_tx_repo.go +++ b/internal/core/domain/offchain_tx_repo.go @@ -5,5 +5,6 @@ import "context" type OffchainTxRepository interface { AddOrUpdateOffchainTx(ctx context.Context, offchainTx *OffchainTx) error GetOffchainTx(ctx context.Context, txid string) (*OffchainTx, error) + GetOffchainTxsByTxids(ctx context.Context, txids []string) ([]*OffchainTx, error) Close() } diff --git a/internal/infrastructure/db/badger/ark_repo.go b/internal/infrastructure/db/badger/ark_repo.go index 3bb6c34ef..0ca0a92cd 100644 --- a/internal/infrastructure/db/badger/ark_repo.go +++ b/internal/infrastructure/db/badger/ark_repo.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "path/filepath" + "strings" "time" "github.com/arkade-os/arkd/internal/core/domain" @@ -234,6 +235,28 @@ func (r *arkRepository) GetOffchainTx( return r.getOffchainTx(ctx, txid) } +func (r *arkRepository) GetOffchainTxsByTxids( + ctx context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + if len(txids) == 0 { + return []*domain.OffchainTx{}, nil + } + + txs := make([]*domain.OffchainTx, 0, len(txids)) + for _, txid := range txids { + tx, err := r.getOffchainTx(ctx, txid) + if err != nil { + if strings.Contains(err.Error(), "not found") { + continue + } + return nil, err + } + txs = append(txs, tx) + } + + return txs, nil +} + func (r *arkRepository) Close() { // nolint r.store.Close() diff --git a/internal/infrastructure/db/postgres/migration/20260409140000_checkpoint_tx_offchain_txid_index.down.sql b/internal/infrastructure/db/postgres/migration/20260409140000_checkpoint_tx_offchain_txid_index.down.sql new file mode 100644 index 000000000..3bf97317f --- /dev/null +++ b/internal/infrastructure/db/postgres/migration/20260409140000_checkpoint_tx_offchain_txid_index.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS idx_checkpoint_tx_offchain_txid; diff --git a/internal/infrastructure/db/postgres/migration/20260409140000_checkpoint_tx_offchain_txid_index.up.sql b/internal/infrastructure/db/postgres/migration/20260409140000_checkpoint_tx_offchain_txid_index.up.sql new file mode 100644 index 000000000..4fddcadc2 --- /dev/null +++ b/internal/infrastructure/db/postgres/migration/20260409140000_checkpoint_tx_offchain_txid_index.up.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS idx_checkpoint_tx_offchain_txid + ON checkpoint_tx (offchain_txid); diff --git a/internal/infrastructure/db/postgres/offchain_tx_repo.go b/internal/infrastructure/db/postgres/offchain_tx_repo.go index 9f41ea76c..a45be1dbc 100644 --- a/internal/infrastructure/db/postgres/offchain_tx_repo.go +++ b/internal/infrastructure/db/postgres/offchain_tx_repo.go @@ -114,6 +114,62 @@ func (v *offchainTxRepository) GetOffchainTx( }, nil } +func (v *offchainTxRepository) GetOffchainTxsByTxids( + ctx context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + if len(txids) == 0 { + return []*domain.OffchainTx{}, nil + } + + rows, err := v.querier.SelectOffchainTxsByTxids(ctx, txids) + if err != nil { + return nil, err + } + + grouped := make(map[string][]queries.OffchainTxVw) + for _, row := range rows { + grouped[row.OffchainTxVw.Txid] = append(grouped[row.OffchainTxVw.Txid], row.OffchainTxVw) + } + + txs := make([]*domain.OffchainTx, 0, len(grouped)) + for _, vws := range grouped { + vt := vws[0] + checkpointTxs := make(map[string]string) + commitmentTxids := make(map[string]string) + rootCommitmentTxId := "" + for _, vw := range vws { + if vw.CheckpointTxid.Valid && vw.CheckpointTx.Valid { + checkpointTxs[vw.CheckpointTxid.String] = vw.CheckpointTx.String + commitmentTxids[vw.CheckpointTxid.String] = vw.CommitmentTxid.String + if vw.IsRootCommitmentTxid.Valid && vw.IsRootCommitmentTxid.Bool { + rootCommitmentTxId = vw.CommitmentTxid.String + } + } + } + stage := domain.Stage{Code: int(vt.StageCode)} + if vt.FailReason.String != "" { + stage.Failed = true + } + if domain.OffchainTxStage(vt.StageCode) == domain.OffchainTxFinalizedStage { + stage.Ended = true + } + txs = append(txs, &domain.OffchainTx{ + ArkTxid: vt.Txid, + ArkTx: vt.Tx, + StartingTimestamp: vt.StartingTimestamp, + EndingTimestamp: vt.EndingTimestamp, + ExpiryTimestamp: vt.ExpiryTimestamp, + FailReason: vt.FailReason.String, + Stage: stage, + CheckpointTxs: checkpointTxs, + CommitmentTxids: commitmentTxids, + RootCommitmentTxId: rootCommitmentTxId, + }) + } + + return txs, nil +} + func (v *offchainTxRepository) Close() { _ = v.db.Close() } diff --git a/internal/infrastructure/db/postgres/sqlc/queries/query.sql.go b/internal/infrastructure/db/postgres/sqlc/queries/query.sql.go index 67b7b1060..c3d8c4dc5 100644 --- a/internal/infrastructure/db/postgres/sqlc/queries/query.sql.go +++ b/internal/infrastructure/db/postgres/sqlc/queries/query.sql.go @@ -876,6 +876,50 @@ func (q *Queries) SelectOffchainTx(ctx context.Context, txid string) ([]SelectOf return items, nil } +const selectOffchainTxsByTxids = `-- name: SelectOffchainTxsByTxids :many +SELECT offchain_tx_vw.txid, offchain_tx_vw.tx, offchain_tx_vw.starting_timestamp, offchain_tx_vw.ending_timestamp, offchain_tx_vw.expiry_timestamp, offchain_tx_vw.fail_reason, offchain_tx_vw.stage_code, offchain_tx_vw.checkpoint_txid, offchain_tx_vw.checkpoint_tx, offchain_tx_vw.commitment_txid, offchain_tx_vw.is_root_commitment_txid, offchain_tx_vw.offchain_txid FROM offchain_tx_vw WHERE txid = ANY($1::varchar[]) AND COALESCE(fail_reason, '') = '' +` + +type SelectOffchainTxsByTxidsRow struct { + OffchainTxVw OffchainTxVw +} + +func (q *Queries) SelectOffchainTxsByTxids(ctx context.Context, txids []string) ([]SelectOffchainTxsByTxidsRow, error) { + rows, err := q.db.QueryContext(ctx, selectOffchainTxsByTxids, pq.Array(txids)) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SelectOffchainTxsByTxidsRow + for rows.Next() { + var i SelectOffchainTxsByTxidsRow + if err := rows.Scan( + &i.OffchainTxVw.Txid, + &i.OffchainTxVw.Tx, + &i.OffchainTxVw.StartingTimestamp, + &i.OffchainTxVw.EndingTimestamp, + &i.OffchainTxVw.ExpiryTimestamp, + &i.OffchainTxVw.FailReason, + &i.OffchainTxVw.StageCode, + &i.OffchainTxVw.CheckpointTxid, + &i.OffchainTxVw.CheckpointTx, + &i.OffchainTxVw.CommitmentTxid, + &i.OffchainTxVw.IsRootCommitmentTxid, + &i.OffchainTxVw.OffchainTxid, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const selectPendingSpentVtxo = `-- name: SelectPendingSpentVtxo :many SELECT v.txid, v.vout, v.pubkey, v.amount, v.expires_at, v.created_at, v.commitment_txid, v.spent_by, v.spent, v.unrolled, v.preconfirmed, v.settled_by, v.ark_txid, v.intent_id, v.updated_at, v.depth, v.markers, v.commitments, v.swept, v.asset_id, v.asset_amount FROM vtxo_vw v diff --git a/internal/infrastructure/db/postgres/sqlc/query.sql b/internal/infrastructure/db/postgres/sqlc/query.sql index 47c335993..33747e6e6 100644 --- a/internal/infrastructure/db/postgres/sqlc/query.sql +++ b/internal/infrastructure/db/postgres/sqlc/query.sql @@ -277,6 +277,9 @@ WHERE EXISTS ( -- name: SelectOffchainTx :many SELECT sqlc.embed(offchain_tx_vw) FROM offchain_tx_vw WHERE txid = @txid AND COALESCE(fail_reason, '') = ''; +-- name: SelectOffchainTxsByTxids :many +SELECT sqlc.embed(offchain_tx_vw) FROM offchain_tx_vw WHERE txid = ANY(@txids::varchar[]) AND COALESCE(fail_reason, '') = ''; + -- name: SelectLatestScheduledSession :one SELECT * FROM scheduled_session ORDER BY updated_at DESC LIMIT 1; diff --git a/internal/infrastructure/db/service_test.go b/internal/infrastructure/db/service_test.go index 5b23d0369..5862a12be 100644 --- a/internal/infrastructure/db/service_test.go +++ b/internal/infrastructure/db/service_test.go @@ -3434,6 +3434,67 @@ func testOffchainTxRepository(t *testing.T, svc ports.RepoManager) { require.NotNil(t, offchainTx) require.True(t, gotOffchainTx.IsFinalized()) require.Condition(t, offchainTxMatch(*offchainTx, *gotOffchainTx)) + + bulkFetchedTxs, err := repo.GetOffchainTxsByTxids(ctx, []string{arkTxid}) + require.NoError(t, err) + require.Len(t, bulkFetchedTxs, 1) + require.Equal(t, arkTxid, bulkFetchedTxs[0].ArkTxid) + + bulkFetchedTxs, err = repo.GetOffchainTxsByTxids(ctx, []string{"missing-txid"}) + require.NoError(t, err) + require.Empty(t, bulkFetchedTxs) + + // Insert a second offchain tx so we can exercise multi-txid bulk fetch. + secondArkTxid := txidb + secondCheckpointTxid := "0000000000000000000000000000000000000000000000000000000000000005" + secondCheckpointPtx := "cHNldP8BAgQCAAAAAQQBAAEFAQABBgEDAfsEAgAAAAA=signed-2" + secondEvents := []domain.Event{ + domain.OffchainTxRequested{ + OffchainTxEvent: domain.OffchainTxEvent{ + Id: secondArkTxid, + Type: domain.EventTypeOffchainTxRequested, + }, + StartingTimestamp: now.Unix(), + }, + domain.OffchainTxAccepted{ + OffchainTxEvent: domain.OffchainTxEvent{ + Id: secondArkTxid, + Type: domain.EventTypeOffchainTxAccepted, + }, + CommitmentTxids: map[string]string{ + secondCheckpointTxid: rootCommitmentTxid, + }, + SignedCheckpointTxs: map[string]string{ + secondCheckpointTxid: secondCheckpointPtx, + }, + RootCommitmentTxid: rootCommitmentTxid, + }, + } + secondOffchainTx := domain.NewOffchainTxFromEvents(secondEvents) + require.NoError(t, repo.AddOrUpdateOffchainTx(ctx, secondOffchainTx)) + + // Multi-txid fetch returns both, plus tolerates a missing entry. + bulkFetchedTxs, err = repo.GetOffchainTxsByTxids( + ctx, []string{arkTxid, secondArkTxid, "missing-txid"}, + ) + require.NoError(t, err) + require.Len(t, bulkFetchedTxs, 2) + + got := make(map[string]*domain.OffchainTx, len(bulkFetchedTxs)) + for _, tx := range bulkFetchedTxs { + got[tx.ArkTxid] = tx + } + require.Contains(t, got, arkTxid) + require.Contains(t, got, secondArkTxid) + + // Each result must carry its own checkpoint mapping — guards the + // row-grouping logic against cross-txid contamination. + require.Contains(t, got[arkTxid].CheckpointTxs, checkpointTxid1) + require.Contains(t, got[arkTxid].CheckpointTxs, checkpointTxid2) + require.NotContains(t, got[arkTxid].CheckpointTxs, secondCheckpointTxid) + require.Contains(t, got[secondArkTxid].CheckpointTxs, secondCheckpointTxid) + require.NotContains(t, got[secondArkTxid].CheckpointTxs, checkpointTxid1) + require.NotContains(t, got[secondArkTxid].CheckpointTxs, checkpointTxid2) }) } diff --git a/internal/infrastructure/db/sqlite/migration/20260409140000_checkpoint_tx_offchain_txid_index.down.sql b/internal/infrastructure/db/sqlite/migration/20260409140000_checkpoint_tx_offchain_txid_index.down.sql new file mode 100644 index 000000000..3bf97317f --- /dev/null +++ b/internal/infrastructure/db/sqlite/migration/20260409140000_checkpoint_tx_offchain_txid_index.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS idx_checkpoint_tx_offchain_txid; diff --git a/internal/infrastructure/db/sqlite/migration/20260409140000_checkpoint_tx_offchain_txid_index.up.sql b/internal/infrastructure/db/sqlite/migration/20260409140000_checkpoint_tx_offchain_txid_index.up.sql new file mode 100644 index 000000000..4fddcadc2 --- /dev/null +++ b/internal/infrastructure/db/sqlite/migration/20260409140000_checkpoint_tx_offchain_txid_index.up.sql @@ -0,0 +1,2 @@ +CREATE INDEX IF NOT EXISTS idx_checkpoint_tx_offchain_txid + ON checkpoint_tx (offchain_txid); diff --git a/internal/infrastructure/db/sqlite/offchain_tx_repo.go b/internal/infrastructure/db/sqlite/offchain_tx_repo.go index c9df73897..3f30b7d85 100644 --- a/internal/infrastructure/db/sqlite/offchain_tx_repo.go +++ b/internal/infrastructure/db/sqlite/offchain_tx_repo.go @@ -9,6 +9,11 @@ import ( "github.com/arkade-os/arkd/internal/infrastructure/db/sqlite/sqlc/queries" ) +// sqliteMaxBulkTxids caps the per-query batch for GetOffchainTxsByTxids to stay +// well under SQLITE_MAX_VARIABLE_NUMBER (default 999 on SQLite < 3.32). The +// SLICE expansion in the generated query emits one bound parameter per txid. +const sqliteMaxBulkTxids = 500 + type offchainTxRepository struct { db *sql.DB querier *queries.Queries @@ -114,6 +119,67 @@ func (v *offchainTxRepository) GetOffchainTx( }, nil } +func (v *offchainTxRepository) GetOffchainTxsByTxids( + ctx context.Context, txids []string, +) ([]*domain.OffchainTx, error) { + if len(txids) == 0 { + return []*domain.OffchainTx{}, nil + } + + grouped := make(map[string][]queries.OffchainTxVw) + for start := 0; start < len(txids); start += sqliteMaxBulkTxids { + end := min(start+sqliteMaxBulkTxids, len(txids)) + rows, err := v.querier.SelectOffchainTxsByTxids(ctx, txids[start:end]) + if err != nil { + return nil, err + } + for _, row := range rows { + grouped[row.OffchainTxVw.Txid] = append( + grouped[row.OffchainTxVw.Txid], + row.OffchainTxVw, + ) + } + } + + txs := make([]*domain.OffchainTx, 0, len(grouped)) + for _, vws := range grouped { + vt := vws[0] + checkpointTxs := make(map[string]string) + commitmentTxids := make(map[string]string) + rootCommitmentTxId := "" + for _, vw := range vws { + if vw.CheckpointTxid != "" && vw.CheckpointTx != "" { + checkpointTxs[vw.CheckpointTxid] = vw.CheckpointTx + commitmentTxids[vw.CheckpointTxid] = vw.CommitmentTxid.String + if vw.IsRootCommitmentTxid.Bool { + rootCommitmentTxId = vw.CommitmentTxid.String + } + } + } + stage := domain.Stage{Code: int(vt.StageCode)} + if vt.FailReason.String != "" { + stage.Failed = true + } + if domain.OffchainTxStage(vt.StageCode) == domain.OffchainTxFinalizedStage { + stage.Ended = true + } + txs = append(txs, &domain.OffchainTx{ + ArkTxid: vt.Txid, + ArkTx: vt.Tx, + StartingTimestamp: vt.StartingTimestamp, + EndingTimestamp: vt.EndingTimestamp, + ExpiryTimestamp: vt.ExpiryTimestamp, + FailReason: vt.FailReason.String, + Stage: stage, + CheckpointTxs: checkpointTxs, + CommitmentTxids: commitmentTxids, + RootCommitmentTxId: rootCommitmentTxId, + }) + } + + return txs, nil +} + func (v *offchainTxRepository) Close() { _ = v.db.Close() } diff --git a/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go b/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go index 7bf84f46b..15adcd901 100644 --- a/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go +++ b/internal/infrastructure/db/sqlite/sqlc/queries/query.sql.go @@ -936,6 +936,60 @@ func (q *Queries) SelectOffchainTx(ctx context.Context, txid string) ([]SelectOf return items, nil } +const selectOffchainTxsByTxids = `-- name: SelectOffchainTxsByTxids :many +SELECT offchain_tx_vw.txid, offchain_tx_vw.tx, offchain_tx_vw.starting_timestamp, offchain_tx_vw.ending_timestamp, offchain_tx_vw.expiry_timestamp, offchain_tx_vw.fail_reason, offchain_tx_vw.stage_code, offchain_tx_vw.checkpoint_txid, offchain_tx_vw.checkpoint_tx, offchain_tx_vw.commitment_txid, offchain_tx_vw.is_root_commitment_txid, offchain_tx_vw.offchain_txid FROM offchain_tx_vw WHERE txid IN (/*SLICE:txids*/?) AND COALESCE(fail_reason, '') = '' +` + +type SelectOffchainTxsByTxidsRow struct { + OffchainTxVw OffchainTxVw +} + +func (q *Queries) SelectOffchainTxsByTxids(ctx context.Context, txids []string) ([]SelectOffchainTxsByTxidsRow, error) { + query := selectOffchainTxsByTxids + var queryParams []interface{} + if len(txids) > 0 { + for _, v := range txids { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:txids*/?", strings.Repeat(",?", len(txids))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:txids*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SelectOffchainTxsByTxidsRow + for rows.Next() { + var i SelectOffchainTxsByTxidsRow + if err := rows.Scan( + &i.OffchainTxVw.Txid, + &i.OffchainTxVw.Tx, + &i.OffchainTxVw.StartingTimestamp, + &i.OffchainTxVw.EndingTimestamp, + &i.OffchainTxVw.ExpiryTimestamp, + &i.OffchainTxVw.FailReason, + &i.OffchainTxVw.StageCode, + &i.OffchainTxVw.CheckpointTxid, + &i.OffchainTxVw.CheckpointTx, + &i.OffchainTxVw.CommitmentTxid, + &i.OffchainTxVw.IsRootCommitmentTxid, + &i.OffchainTxVw.OffchainTxid, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const selectPendingSpentVtxo = `-- name: SelectPendingSpentVtxo :many SELECT v.txid, v.vout, v.pubkey, v.amount, v.expires_at, v.created_at, v.commitment_txid, v.spent_by, v.spent, v.unrolled, v.preconfirmed, v.settled_by, v.ark_txid, v.intent_id, v.updated_at, v.depth, v.markers, v.commitments, v.swept, v.asset_id, v.asset_amount FROM vtxo_vw v diff --git a/internal/infrastructure/db/sqlite/sqlc/query.sql b/internal/infrastructure/db/sqlite/sqlc/query.sql index 47b4badea..204c96fe8 100644 --- a/internal/infrastructure/db/sqlite/sqlc/query.sql +++ b/internal/infrastructure/db/sqlite/sqlc/query.sql @@ -283,6 +283,9 @@ WHERE EXISTS ( -- name: SelectOffchainTx :many SELECT sqlc.embed(offchain_tx_vw) FROM offchain_tx_vw WHERE txid = @txid AND COALESCE(fail_reason, '') = ''; +-- name: SelectOffchainTxsByTxids :many +SELECT sqlc.embed(offchain_tx_vw) FROM offchain_tx_vw WHERE txid IN (sqlc.slice('txids')) AND COALESCE(fail_reason, '') = ''; + -- name: SelectLatestScheduledSession :one SELECT * FROM scheduled_session ORDER BY updated_at DESC LIMIT 1; diff --git a/internal/test/e2e/vtxo_chain_test.go b/internal/test/e2e/vtxo_chain_test.go new file mode 100644 index 000000000..575590588 --- /dev/null +++ b/internal/test/e2e/vtxo_chain_test.go @@ -0,0 +1,176 @@ +package e2e_test + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "flag" + "fmt" + "net/http" + "sync" + "testing" + "time" + + arksdk "github.com/arkade-os/arkd/pkg/client-lib" + grpcindexer "github.com/arkade-os/arkd/pkg/client-lib/indexer/grpc" + "github.com/arkade-os/arkd/pkg/client-lib/store" + "github.com/arkade-os/arkd/pkg/client-lib/types" + "github.com/btcsuite/btcd/btcec/v2" + "github.com/stretchr/testify/require" +) + +var ( + chainLength = flag.Int("chain-length", 10, "Number of self-send hops in the VTXO chain") + initialAmount = flag.Int("initial-amount", 1000, "Initial funding amount in satoshis") + arkServerUrl = flag.String("server-url", serverUrl, "Ark server gRPC address") + arkAdminUrl = flag.String("admin-url", adminUrl, "Ark admin HTTP address") + walletSeed = flag.String("seed", "", "Wallet private key hex (random if empty)") + skipChain = flag.Bool("skip-chain", false, "Skip chain creation, only run GetVtxoChain on existing wallet") +) + +// TestVtxoChain creates a long VTXO chain by repeatedly self-sending. +// Run with: +// +// go test -v -run TestVtxoChain -args -chain-length=50 -initial-amount=10000 +func TestVtxoChain(t *testing.T) { + if !flag.Parsed() { + flag.Parse() + } + + ctx := t.Context() + + appDataStore, err := store.NewStore(store.Config{ + ConfigStoreType: types.InMemoryStore, + }) + require.NoError(t, err) + + client, err := arksdk.NewArkClient(appDataStore) + require.NoError(t, err) + t.Cleanup(client.Stop) + + seed := *walletSeed + if seed == "" { + privkey, err := btcec.NewPrivateKey() + require.NoError(t, err) + seed = hex.EncodeToString(privkey.Serialize()) + } + t.Logf("wallet seed: %s", seed) + + err = client.Init(ctx, arksdk.InitArgs{ + WalletType: arksdk.SingleKeyWallet, + ServerUrl: *arkServerUrl, + Password: password, + Seed: seed, + ExplorerURL: explorerUrl, + }) + require.NoError(t, err) + + err = client.Unlock(ctx, password) + require.NoError(t, err) + + _, offchainAddr, _, err := client.Receive(ctx) + require.NoError(t, err) + + if !*skipChain { + // Fund the client offchain via admin note. + note := chainGenerateNote(t, uint64(*initialAmount)) + + wg := &sync.WaitGroup{} + var notifyErr error + wg.Go(func() { + _, notifyErr = client.NotifyIncomingFunds(ctx, offchainAddr.Address) + }) + + redeemTxid, err := client.RedeemNotes(ctx, []string{note}) + require.NoError(t, err) + require.NotEmpty(t, redeemTxid) + + wg.Wait() + require.NoError(t, notifyErr) + + time.Sleep(time.Second) + + spendable, _, err := client.ListVtxos(ctx) + require.NoError(t, err) + require.NotEmpty(t, spendable, "no spendable VTXOs after faucet") + + start := time.Now() + hops := 0 + + for i := range *chainLength { + spendable, _, err = client.ListVtxos(ctx) + require.NoError(t, err) + for len(spendable) == 0 { + spendable, _, err = client.ListVtxos(ctx) + require.NoError(t, err) + } + require.Len(t, spendable, 1) + tip := spendable[0] + + wg := &sync.WaitGroup{} + var notifyErr error + wg.Go(func() { + _, notifyErr = client.NotifyIncomingFunds(ctx, offchainAddr.Address) + }) + + res, err := client.SendOffChain(ctx, []types.Receiver{{ + To: offchainAddr.Address, + Amount: tip.Amount, + }}) + require.NoError(t, err) + + wg.Wait() + require.NoError(t, notifyErr) + + hops++ + t.Logf("hop %d: txid=%s", i, res.Txid) + } + + chainElapsed := time.Since(start) + t.Logf("chain built: %d hops in %s", hops, chainElapsed) + + time.Sleep(2 * time.Second) + } + + spendable, _, err := client.ListVtxos(ctx) + require.NoError(t, err) + tip := spendable[0] + + // Benchmark GetVtxoChain on the last VTXO in the chain. + last := types.Outpoint{Txid: tip.Txid, VOut: tip.VOut} + idx, err := grpcindexer.NewClient(*arkServerUrl) + require.NoError(t, err) + + getChainStart := time.Now() + resp, err := idx.GetVtxoChain(ctx, last) + getChainElapsed := time.Since(getChainStart) + require.NoError(t, err) + + t.Logf("GetVtxoChain: %d entries in %s (tip=%s:%d)", len(resp.Chain), getChainElapsed, last.Txid, last.VOut) +} + +func chainGenerateNote(t *testing.T, amount uint64) string { + t.Helper() + + httpClient := &http.Client{Timeout: 15 * time.Second} + + reqBody := bytes.NewReader([]byte(fmt.Sprintf(`{"amount": "%d"}`, amount))) + req, err := http.NewRequest("POST", *arkAdminUrl+"/v1/admin/note", reqBody) + require.NoError(t, err) + + req.Header.Set("Authorization", "Basic YWRtaW46YWRtaW4=") + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + var noteResp struct { + Notes []string `json:"notes"` + } + err = json.NewDecoder(resp.Body).Decode(¬eResp) + require.NoError(t, err) + require.NotEmpty(t, noteResp.Notes) + + return noteResp.Notes[0] +}