diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index 2f42a7f24ab..6ffe4737c25 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -1065,7 +1065,10 @@ func (s *Store) processCollElgEvents() error { s.purgerLock.Unlock() time.Sleep(sleepTime * time.Millisecond) s.purgerLock.Lock() - collItr, err = s.db.GetIterator(nextKey, endKey) + // Re-create the iterator so it reflects any deletes + // made by the purger while the lock was released. + collItr.Release() + collItr, err = s.db.GetIterator(startKey, endKey) if err != nil { return err } diff --git a/core/ledger/pvtdatastorage/store_test.go b/core/ledger/pvtdatastorage/store_test.go index 2cf68ef7d2a..87a4ffc541f 100644 --- a/core/ledger/pvtdatastorage/store_test.go +++ b/core/ledger/pvtdatastorage/store_test.go @@ -1854,6 +1854,69 @@ func testutilWaitForCollElgProcToFinish(s *Store) { s.collElgProcSync.waitForDone() } +func TestProcessCollElgEventsIteratorRecreatedAfterPurge(t *testing.T) { + btlPolicy := btltestutil.SampleBTLPolicy(map[[2]string]uint64{ + {"ns-1", "coll-1"}: 1, + }) + conf := pvtDataConf() + conf.MaxBatchSize = 1 + conf.BatchesInterval = 500 + + env := NewTestStoreEnv(t, "TestCollElgStaleIter", btlPolicy, conf) + defer env.Cleanup() + s := env.TestStore + + s.purgeInterval = math.MaxUint64 + + s.purgerLock.Lock() + s.purgerLock.Unlock() //lint:ignore SA2001 syncpoint + + require.NoError(t, s.Commit(0, nil, nil, nil)) + + const numBlocks = uint64(10) + for blk := uint64(1); blk <= numBlocks; blk++ { + missingData := make(ledger.TxMissingPvtData) + missingData.Add(1, "ns-1", "coll-1", false) + require.NoError(t, s.Commit(blk, nil, missingData, nil)) + } + + const lastBlock = uint64(15) + for blk := numBlocks + 1; blk <= lastBlock; blk++ { + require.NoError(t, s.Commit(blk, nil, nil, nil)) + } + + { + key := encodeCollElgKey(lastBlock) + m := newCollElgInfo(map[string][]string{"ns-1": []string{"coll-1"}}) + val, err := encodeCollElgVal(m) + require.NoError(t, err) + b := s.db.NewUpdateBatch() + b.Put(key, val) + require.NoError(t, s.db.WriteBatch(b, true)) + } + + procErr := make(chan error, 1) + go func() { + procErr <- s.processCollElgEvents() + }() + + time.Sleep(100 * time.Millisecond) + + s.purgerLock.Lock() + require.NoError(t, s.purgeExpiredData(0, lastBlock)) + s.purgerLock.Unlock() + + require.NoError(t, <-procErr) + + for blk := uint64(1); blk <= numBlocks; blk++ { + k := &missingDataKey{nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1", blkNum: blk}} + require.False(t, + testElgPrioMissingDataKeyExists(t, s, k), + "orphaned eligible prioritized missing data entry must not exist for expired block %d", blk, + ) + } +} + func produceSamplePvtdata(t *testing.T, txNum uint64, nsColls []string) *ledger.TxPvtData { builder := rwsetutil.NewRWSetBuilder() for _, nsColl := range nsColls {