Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions internal/core/application/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ func (i *indexerService) walkVtxoChain(
chain := make([]ChainTx, 0)
nextVtxos := frontier
visited := make(map[string]bool)
offchainTxCache := make(map[string]*domain.OffchainTx)
allOutpoints := make([]Outpoint, 0)

// Lazy cache for VTXOs loaded during this page.
Expand Down Expand Up @@ -601,6 +602,33 @@ func (i *indexerService) walkVtxoChain(
return nil, nil, "", fmt.Errorf("vtxo not found for outpoint: %v", nextVtxos)
}

missingOffchainTxids := make(map[string]struct{})
for _, vtxo := range vtxos {
if !vtxo.Preconfirmed {
continue
}
if _, ok := offchainTxCache[vtxo.Txid]; ok {
continue
}
missingOffchainTxids[vtxo.Txid] = struct{}{}
}

if len(missingOffchainTxids) > 0 {
txids := make([]string, 0, len(missingOffchainTxids))
for txid := range missingOffchainTxids {
txids = append(txids, txid)
}

offchainTxs, err := i.repoManager.OffchainTxs().GetOffchainTxsByTxids(ctx, txids)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to retrieve offchain txs: %s", err)
}

for _, tx := range offchainTxs {
offchainTxCache[tx.ArkTxid] = tx
}
}

newNextVtxos := make([]domain.Outpoint, 0)
for _, vtxo := range vtxos {
key := vtxo.Outpoint.String()
Expand Down Expand Up @@ -630,9 +658,14 @@ func (i *indexerService) walkVtxoChain(
// also, we have to populate the newNextVtxos with the checkpoints inputs
// in order to continue the chain in the next iteration
if vtxo.Preconfirmed {
offchainTx, err := i.repoManager.OffchainTxs().GetOffchainTx(ctx, vtxo.Txid)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to retrieve offchain tx: %s", err)
offchainTx, ok := offchainTxCache[vtxo.Txid]
if !ok {
var err error
offchainTx, err = i.repoManager.OffchainTxs().GetOffchainTx(ctx, vtxo.Txid)
if err != nil {
return nil, nil, "", fmt.Errorf("failed to retrieve offchain tx: %s", err)
}
offchainTxCache[vtxo.Txid] = offchainTx
}

chainTx := ChainTx{
Expand Down
231 changes: 230 additions & 1 deletion internal/core/application/indexer_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/arkade-os/arkd/internal/core/domain"
"github.com/btcsuite/btcd/btcutil/psbt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/stretchr/testify/require"
)

// Lightweight fake repos for benchmarks — no testify/mock overhead.
Expand Down Expand Up @@ -92,12 +95,24 @@ func (r *benchOffchainTxRepo) GetOffchainTx(
return &domain.OffchainTx{CheckpointTxs: map[string]string{}}, nil
}

func (r *benchOffchainTxRepo) GetOffchainTxsByTxids(
_ context.Context, txids []string,
) ([]*domain.OffchainTx, error) {
result := make([]*domain.OffchainTx, 0, len(txids))
for _, txid := range txids {
if tx, ok := r.txs[txid]; ok {
result = append(result, tx)
}
}
return result, nil
}

func (r *benchOffchainTxRepo) Close() {}

type benchRepoManager struct {
vtxoRepo *benchVtxoRepo
markerRepo *benchMarkerRepo
offchainRepo *benchOffchainTxRepo
offchainRepo domain.OffchainTxRepository
}

func (m *benchRepoManager) Events() domain.EventRepository { return nil }
Expand Down Expand Up @@ -414,3 +429,217 @@ func BenchmarkCheckpointPSBTParse(b *testing.B) {
}
}
}

// countingOffchainTxRepo wraps benchOffchainTxRepo and counts calls.
type countingOffchainTxRepo struct {
inner *benchOffchainTxRepo
singleCalls atomic.Int64
bulkCalls atomic.Int64
latencyPerCall time.Duration
}

func (r *countingOffchainTxRepo) GetOffchainTx(
ctx context.Context, txid string,
) (*domain.OffchainTx, error) {
r.singleCalls.Add(1)
if r.latencyPerCall > 0 {
time.Sleep(r.latencyPerCall)
}
return r.inner.GetOffchainTx(ctx, txid)
}

func (r *countingOffchainTxRepo) GetOffchainTxsByTxids(
ctx context.Context, txids []string,
) ([]*domain.OffchainTx, error) {
r.bulkCalls.Add(1)
if r.latencyPerCall > 0 {
time.Sleep(r.latencyPerCall) // one round-trip regardless of batch size
}
return r.inner.GetOffchainTxsByTxids(ctx, txids)
}

func (r *countingOffchainTxRepo) AddOrUpdateOffchainTx(
_ context.Context, _ *domain.OffchainTx,
) error {
return nil
}

func (r *countingOffchainTxRepo) Close() {}

func (r *countingOffchainTxRepo) reset() {
r.singleCalls.Store(0)
r.bulkCalls.Store(0)
}

// noBulkOffchainTxRepo is like benchOffchainTxRepo but GetOffchainTxsByTxids
// always returns empty, forcing the fallback to individual GetOffchainTx calls.
// This simulates the pre-optimization behavior.
type noBulkOffchainTxRepo struct {
countingOffchainTxRepo
}

func (r *noBulkOffchainTxRepo) GetOffchainTxsByTxids(
_ context.Context, _ []string,
) ([]*domain.OffchainTx, error) {
r.bulkCalls.Add(1)
return []*domain.OffchainTx{}, nil
}

// TestBulkOffchainTxReducesDBCalls verifies that the bulk prefetch reduces the
// number of DB round-trips. Uses a fanout tree where each iteration processes
// multiple VTXOs — bulk fetches all offchain txs in one call per iteration
// instead of one call per VTXO.
func TestBulkOffchainTxReducesDBCalls(t *testing.T) {
const depth = 8 // 2^9 - 1 = 511 VTXOs
ctx := context.Background()

// Build fanout tree data (reuse the helper's repo setup).
n := (1 << (depth + 1)) - 1
vtxoRepo := &benchVtxoRepo{vtxos: make(map[string]domain.Vtxo, n)}
innerRepo := &benchOffchainTxRepo{txs: make(map[string]*domain.OffchainTx, n)}

for i := 0; i < n; i++ {
tid := benchTxid(i)
vtxoRepo.vtxos[fmt.Sprintf("%s:0", tid)] = domain.Vtxo{
Outpoint: domain.Outpoint{Txid: tid, VOut: 0},
Preconfirmed: true,
ExpiresAt: int64(1000 + i),
}
left := 2*i + 1
right := 2*i + 2
if left < n && right < n {
innerRepo.txs[tid] = &domain.OffchainTx{
ArkTxid: tid,
CheckpointTxs: map[string]string{
fmt.Sprintf("cp-l-%d", i): benchCheckpointPSBT(benchTxid(left), 0),
fmt.Sprintf("cp-r-%d", i): benchCheckpointPSBT(benchTxid(right), 0),
},
}
} else {
innerRepo.txs[tid] = &domain.OffchainTx{
ArkTxid: tid,
CheckpointTxs: map[string]string{},
}
}
}

start := Outpoint{Txid: benchTxid(0), VOut: 0}

// With bulk fetch (current behavior).
bulkRepo := &countingOffchainTxRepo{inner: innerRepo}
svc := &indexerService{repoManager: &benchRepoManager{
vtxoRepo: vtxoRepo, offchainRepo: bulkRepo,
}}
resp, err := svc.GetVtxoChain(ctx, "", start, nil, "")
require.NoError(t, err)

bulkSingle := bulkRepo.singleCalls.Load()
bulkBulk := bulkRepo.bulkCalls.Load()

// Without bulk fetch (simulated pre-optimization: bulk returns empty).
noBulkRepo := &noBulkOffchainTxRepo{countingOffchainTxRepo{inner: innerRepo}}
svc2 := &indexerService{repoManager: &benchRepoManager{
vtxoRepo: vtxoRepo, offchainRepo: noBulkRepo,
}}
resp2, err := svc2.GetVtxoChain(ctx, "", start, nil, "")
require.NoError(t, err)
require.Equal(t, len(resp.Chain), len(resp2.Chain))

noBulkSingle := noBulkRepo.singleCalls.Load()

t.Logf("fanout tree: depth=%d, %d VTXOs", depth, n)
t.Logf("WITH bulk: %d bulk calls, %d individual calls (total round-trips: %d)",
bulkBulk, bulkSingle, bulkBulk+bulkSingle)
t.Logf("WITHOUT bulk: %d individual calls (total round-trips: %d)",
noBulkSingle, noBulkSingle)

// With bulk fetch, individual calls should be 0 (all served from cache).
require.Zero(t, bulkSingle, "bulk prefetch should eliminate individual GetOffchainTx calls")
// Bulk calls = depth+1 iterations (one per tree level), much fewer than N VTXOs.
require.LessOrEqual(t, bulkBulk, int64(depth+1),
"bulk calls should equal tree depth (one per iteration)")
// Without bulk, individual calls == N (one per preconfirmed VTXO).
require.Equal(t, int64(n), noBulkSingle,
"without bulk, every VTXO triggers an individual call")
}

// BenchmarkOffchainTxBulkVsSingle compares chain traversal with and without
// the bulk offchain tx prefetch, using simulated DB latency to make the
// round-trip reduction visible in wall-clock time. Uses a fanout tree
// (depth 8, 511 VTXOs) where each iteration processes an exponentially
// growing number of VTXOs — the bulk path does 9 round-trips vs 511
// individual calls without it.
func BenchmarkOffchainTxBulkVsSingle(b *testing.B) {
const depth = 8
const simulatedLatency = 50 * time.Microsecond

n := (1 << (depth + 1)) - 1
vtxoRepo := &benchVtxoRepo{vtxos: make(map[string]domain.Vtxo, n)}
innerRepo := &benchOffchainTxRepo{txs: make(map[string]*domain.OffchainTx, n)}

for i := 0; i < n; i++ {
tid := benchTxid(i)
vtxoRepo.vtxos[fmt.Sprintf("%s:0", tid)] = domain.Vtxo{
Outpoint: domain.Outpoint{Txid: tid, VOut: 0},
Preconfirmed: true,
ExpiresAt: int64(1000 + i),
}
left := 2*i + 1
right := 2*i + 2
if left < n && right < n {
innerRepo.txs[tid] = &domain.OffchainTx{
ArkTxid: tid,
CheckpointTxs: map[string]string{
fmt.Sprintf("cp-l-%d", i): benchCheckpointPSBT(benchTxid(left), 0),
fmt.Sprintf("cp-r-%d", i): benchCheckpointPSBT(benchTxid(right), 0),
},
}
} else {
innerRepo.txs[tid] = &domain.OffchainTx{
ArkTxid: tid,
CheckpointTxs: map[string]string{},
}
}
}

start := Outpoint{Txid: benchTxid(0), VOut: 0}
ctx := context.Background()

b.Run(fmt.Sprintf("bulk_prefetch/%d_vtxos", n), func(b *testing.B) {
repo := &countingOffchainTxRepo{inner: innerRepo, latencyPerCall: simulatedLatency}
svc := &indexerService{repoManager: &benchRepoManager{
vtxoRepo: vtxoRepo, offchainRepo: repo,
}}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
repo.reset()
_, err := svc.GetVtxoChain(ctx, "", start, nil, "")
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
b.ReportMetric(float64(repo.bulkCalls.Load())/float64(b.N), "bulk_calls/op")
b.ReportMetric(float64(repo.singleCalls.Load())/float64(b.N), "single_calls/op")
})

b.Run(fmt.Sprintf("no_bulk_fallback/%d_vtxos", n), func(b *testing.B) {
repo := &noBulkOffchainTxRepo{countingOffchainTxRepo{inner: innerRepo, latencyPerCall: simulatedLatency}}
svc := &indexerService{repoManager: &benchRepoManager{
vtxoRepo: vtxoRepo, offchainRepo: repo,
}}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
repo.reset()
_, err := svc.GetVtxoChain(ctx, "", start, nil, "")
if err != nil {
b.Fatal(err)
}
}
b.StopTimer()
b.ReportMetric(float64(repo.bulkCalls.Load())/float64(b.N), "bulk_calls/op")
b.ReportMetric(float64(repo.singleCalls.Load())/float64(b.N), "single_calls/op")
})
}
Loading