Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ func SetSleeper(d sleeperSetter, sleeper customSleeper) {
}

type testSleeper struct {
c int32
c atomic.Int32
}

func (s *testSleeper) Sleep(duration time.Duration) {
atomic.AddInt32(&s.c, 1)
s.c.Add(1)
}

func TestSleeper(t *testing.T) {
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestSleeper(t *testing.T) {

require.Eventually(t,
func() bool {
return atomic.LoadInt32(&s.c) == 10
return s.c.Load() == 10
},
10*time.Second, time.Millisecond)
})
Expand Down
12 changes: 6 additions & 6 deletions discovery/client/signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func TestSameMessage(t *testing.T) {

func TestDifferentMessages(t *testing.T) {
var n uint = 50
var signedInvokedCount uint32
var signedInvokedCount atomic.Uint32
sign := func(msg []byte) ([]byte, error) {
atomic.AddUint32(&signedInvokedCount, 1)
signedInvokedCount.Add(1)
return msg, nil
}

Expand All @@ -64,21 +64,21 @@ func TestDifferentMessages(t *testing.T) {

// Query once
parallelSignRange(0, n)
require.Equal(t, uint32(n), atomic.LoadUint32(&signedInvokedCount))
require.Equal(t, uint32(n), signedInvokedCount.Load())

// Query twice
parallelSignRange(0, n)
require.Equal(t, uint32(n), atomic.LoadUint32(&signedInvokedCount))
require.Equal(t, uint32(n), signedInvokedCount.Load())

// Query thrice on a disjoint range
for i := n + 1; i < 2*n; i++ {
parallelSignRange(i, i+1)
}
oldSignedInvokedCount := atomic.LoadUint32(&signedInvokedCount)
oldSignedInvokedCount := signedInvokedCount.Load()

// Ensure that some of the early messages 0-n were purged from memory
parallelSignRange(0, n)
require.True(t, oldSignedInvokedCount < atomic.LoadUint32(&signedInvokedCount))
require.True(t, oldSignedInvokedCount < signedInvokedCount.Load())
}

func TestFailure(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions discovery/test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestRevocation(t *testing.T) {
res, err := client.Send(context.Background(), req, client.AuthInfo)
require.NoError(t, err)
// Record number of times we deserialized the identity
firstCount := atomic.LoadUint32(&service.sup.deserializeIdentityCount)
firstCount := service.sup.deserializeIdentityCount.Load()

// Do the same query again
peers, err := res.ForChannel("mychannel").Peers()
Expand All @@ -302,7 +302,7 @@ func TestRevocation(t *testing.T) {
require.NoError(t, err)
// The amount of times deserializeIdentity was called should not have changed
// because requests should have hit the cache
secondCount := atomic.LoadUint32(&service.sup.deserializeIdentityCount)
secondCount := service.sup.deserializeIdentityCount.Load()
require.Equal(t, firstCount, secondCount)

// Now, increment the config sequence
Expand All @@ -312,14 +312,14 @@ func TestRevocation(t *testing.T) {
service.sup.sequenceWrapper.instance.Store(v)

// Revoke all identities inside the MSP manager
atomic.AddUint32(&service.sup.mspWrapper.blocks, uint32(1))
service.sup.mspWrapper.blocks.Add(uint32(1))

// Send the query for the third time
res, err = client.Send(context.Background(), req, client.AuthInfo)
require.NoError(t, err)
// The cache should have been purged, thus deserializeIdentity should have been
// called an additional time
thirdCount := atomic.LoadUint32(&service.sup.deserializeIdentityCount)
thirdCount := service.sup.deserializeIdentityCount.Load()
require.NotEqual(t, thirdCount, secondCount)

// We should be denied access
Expand All @@ -339,15 +339,15 @@ func (c *client) newConnection() (*grpc.ClientConn, error) {
}

type mspWrapper struct {
deserializeIdentityCount uint32
deserializeIdentityCount atomic.Uint32
msp.MSPManager
mspConfigs map[string]*msprotos.FabricMSPConfig
blocks uint32
blocks atomic.Uint32
}

func (w *mspWrapper) DeserializeIdentity(serializedIdentity []byte) (msp.Identity, error) {
atomic.AddUint32(&w.deserializeIdentityCount, 1)
if atomic.LoadUint32(&w.blocks) == uint32(1) {
w.deserializeIdentityCount.Add(1)
if w.blocks.Load() == uint32(1) {
return nil, errors.New("failed deserializing identity")
}
return w.MSPManager.DeserializeIdentity(serializedIdentity)
Expand Down
6 changes: 3 additions & 3 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ func waitForMessages(t *testing.T, msgChan chan uint64, count int, errMsg string
}

func TestConcurrentCloseSend(t *testing.T) {
var stopping int32
var stopping atomic.Int32

comm1, _ := newCommInstance(t, naiveSec)
comm2, port2 := newCommInstance(t, naiveSec)
Expand All @@ -1092,12 +1092,12 @@ func TestConcurrentCloseSend(t *testing.T) {
comm1.Send(createGossipMsg(), remotePeer(port2))
close(ready)

for atomic.LoadInt32(&stopping) == int32(0) {
for stopping.Load() == int32(0) {
comm1.Send(createGossipMsg(), remotePeer(port2))
}
}()
<-ready
comm2.Stop()
atomic.StoreInt32(&stopping, int32(1))
stopping.Store(int32(1))
<-done
}
8 changes: 4 additions & 4 deletions gossip/comm/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func newCommInstanceWithMetrics(t *testing.T, sec *naiveSecProvider, metrics *me
func TestMetrics(t *testing.T) {
testMetricProvider := mocks.TestUtilConstructMetricProvider()

var overflown uint32
var overflown atomic.Uint32
testMetricProvider.FakeBufferOverflow.AddStub = func(delta float64) {
atomic.StoreUint32(&overflown, uint32(1))
overflown.Store(uint32(1))
}

fakeCommMetrics := metrics.NewGossipMetrics(testMetricProvider.FakeProvider).CommMetrics
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestMetrics(t *testing.T) {
// Send messages until the buffer overflow event emission is detected
for {
comm1.Send(createGossipMsg(), remotePeer(port2))
if atomic.LoadUint32(&overflown) == uint32(1) {
if overflown.Load() == uint32(1) {
t.Log("Buffer overflow detected")
break
}
Expand All @@ -80,5 +80,5 @@ func TestMetrics(t *testing.T) {
testMetricProvider.FakeBufferOverflow.AddArgsForCall(0),
)

require.Equal(t, uint32(1), atomic.LoadUint32(&overflown))
require.Equal(t, uint32(1), overflown.Load())
}
32 changes: 16 additions & 16 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (m *mockAnchorPeerTracker) IsAnchorPeer(endpoint string) bool {

type dummyCommModule struct {
validatedMessages chan *protoext.SignedGossipMessage
msgsReceived uint32
msgsSent uint32
msgsReceived atomic.Uint32
msgsSent atomic.Uint32
id string
identitySwitch chan common.PKIidType
presumeDead chan common.PKIidType
Expand All @@ -108,7 +108,7 @@ type dummyCommModule struct {
shouldGossip bool
disableComm bool
mock *mock.Mock
signCount uint32
signCount atomic.Uint32
}

type gossipInstance struct {
Expand Down Expand Up @@ -145,7 +145,7 @@ func (comm *dummyCommModule) recordValidation(validatedMessages chan *protoext.S
}

func (comm *dummyCommModule) SignMessage(am *proto.GossipMessage, internalEndpoint string) *proto.Envelope {
atomic.AddUint32(&comm.signCount, 1)
comm.signCount.Add(1)
protoext.NoopSign(am)

secret := &proto.Secret{
Expand Down Expand Up @@ -207,7 +207,7 @@ func (comm *dummyCommModule) SendToPeer(peer *NetworkMember, msg *protoext.Signe
s, _ := protoext.NoopSign(msg.GossipMessage)
comm.streams[peer.Endpoint].Send(s.Envelope)
comm.lock.Unlock()
atomic.AddUint32(&comm.msgsSent, 1)
comm.msgsSent.Add(1)
}

func (comm *dummyCommModule) Ping(peer *NetworkMember) bool {
Expand Down Expand Up @@ -262,11 +262,11 @@ func (comm *dummyCommModule) CloseConn(peer *NetworkMember) {
}

func (g *gossipInstance) receivedMsgCount() int {
return int(atomic.LoadUint32(&g.comm.msgsReceived))
return int(g.comm.msgsReceived.Load())
}

func (g *gossipInstance) sentMsgCount() int {
return int(atomic.LoadUint32(&g.comm.msgsSent))
return int(g.comm.msgsSent.Load())
}

func (g *gossipInstance) discoveryImpl() *gossipDiscoveryImpl {
Expand Down Expand Up @@ -313,7 +313,7 @@ func (g *gossipInstance) GossipStream(stream proto.Gossip_GossipStreamServer) er
ID: common.PKIidType("testID"),
},
}
atomic.AddUint32(&g.comm.msgsReceived, 1)
g.comm.msgsReceived.Add(1)

if aliveMsg := gMsg.GetAliveMsg(); aliveMsg != nil {
g.tryForwardMessage(gMsg)
Expand Down Expand Up @@ -598,10 +598,10 @@ func TestNoSigningIfNoMembership(t *testing.T) {
inst := createDiscoveryInstance(8931, "foreveralone", nil)
defer inst.Stop()
time.Sleep(defaultTestConfig.AliveTimeInterval * 10)
assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount))
assert.Zero(t, inst.comm.signCount.Load())

inst.InitiateSync(10000)
assert.Zero(t, atomic.LoadUint32(&inst.comm.signCount))
assert.Zero(t, inst.comm.signCount.Load())
}

func TestValidation(t *testing.T) {
Expand Down Expand Up @@ -1167,31 +1167,31 @@ func TestCertificateChange(t *testing.T) {
// Shutdown the second peer
waitUntilOrFailBlocking(t, p2.Stop)

var pingCountFrom1 uint32
var pingCountFrom3 uint32
var pingCountFrom1 atomic.Uint32
var pingCountFrom3 atomic.Uint32
// Program mocks to increment ping counters
p1.comm.lock.Lock()
p1.comm.mock = &mock.Mock{}
p1.comm.mock.On("SendToPeer", mock.Anything, mock.Anything)
p1.comm.mock.On("Ping").Run(func(arguments mock.Arguments) {
atomic.AddUint32(&pingCountFrom1, 1)
pingCountFrom1.Add(1)
})
p1.comm.lock.Unlock()

p3.comm.lock.Lock()
p3.comm.mock = &mock.Mock{}
p3.comm.mock.On("SendToPeer", mock.Anything, mock.Anything)
p3.comm.mock.On("Ping").Run(func(arguments mock.Arguments) {
atomic.AddUint32(&pingCountFrom3, 1)
pingCountFrom3.Add(1)
})
p3.comm.lock.Unlock()

pingCount1 := func() uint32 {
return atomic.LoadUint32(&pingCountFrom1)
return pingCountFrom1.Load()
}

pingCount3 := func() uint32 {
return atomic.LoadUint32(&pingCountFrom3)
return pingCountFrom3.Load()
}

c1 := pingCount1()
Expand Down
30 changes: 15 additions & 15 deletions gossip/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ type leaderElectionSvcImpl struct {
stopChan chan struct{}
interruptChan chan struct{}
stopWG sync.WaitGroup
isLeader int32
leaderExists int32
yield int32
isLeader atomic.Int32
leaderExists atomic.Int32
yield atomic.Int32
sleeping bool
adapter LeaderElectionAdapter
logger util.Logger
Expand Down Expand Up @@ -224,7 +224,7 @@ func (le *leaderElectionSvcImpl) handleMessage(msg Msg) {
if msg.IsProposal() {
le.proposals.Add(string(msg.SenderID()))
} else if msg.IsDeclaration() {
atomic.StoreInt32(&le.leaderExists, int32(1))
le.leaderExists.Store(int32(1))
if le.sleeping && len(le.interruptChan) == 0 {
le.interruptChan <- struct{}{}
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func (le *leaderElectionSvcImpl) leaderElection() {
// If we got here, there is no one that proposed being a leader
// that's a better candidate than us.
le.beLeader()
atomic.StoreInt32(&le.leaderExists, int32(1))
le.leaderExists.Store(int32(1))
}

// propose sends a leadership proposal message to remote peers
Expand All @@ -333,7 +333,7 @@ func (le *leaderElectionSvcImpl) follower() {
defer le.logger.Debug(le.id, ": Exiting")

le.proposals.Clear()
atomic.StoreInt32(&le.leaderExists, int32(0))
le.leaderExists.Store(int32(0))
le.adapter.ReportMetrics(false)
select {
case <-time.After(le.config.LeaderAliveThreshold):
Expand Down Expand Up @@ -384,25 +384,25 @@ func (le *leaderElectionSvcImpl) isAlive(id peerID) bool {
}

func (le *leaderElectionSvcImpl) isLeaderExists() bool {
return atomic.LoadInt32(&le.leaderExists) == int32(1)
return le.leaderExists.Load() == int32(1)
}

// IsLeader returns whether this peer is a leader
func (le *leaderElectionSvcImpl) IsLeader() bool {
isLeader := atomic.LoadInt32(&le.isLeader) == int32(1)
isLeader := le.isLeader.Load() == int32(1)
le.logger.Debug(le.id, ": Returning", isLeader)
return isLeader
}

func (le *leaderElectionSvcImpl) beLeader() {
le.logger.Info(le.id, ": Becoming a leader")
atomic.StoreInt32(&le.isLeader, int32(1))
le.isLeader.Store(int32(1))
le.callback(true)
}

func (le *leaderElectionSvcImpl) stopBeingLeader() {
le.logger.Info(le.id, "Stopped being a leader")
atomic.StoreInt32(&le.isLeader, int32(0))
le.isLeader.Store(int32(0))
le.callback(false)
}

Expand All @@ -416,14 +416,14 @@ func (le *leaderElectionSvcImpl) shouldStop() bool {
}

func (le *leaderElectionSvcImpl) isYielding() bool {
return atomic.LoadInt32(&le.yield) == int32(1)
return le.yield.Load() == int32(1)
}

func (le *leaderElectionSvcImpl) stopYielding() {
le.logger.Debug("Stopped yielding")
le.Lock()
defer le.Unlock()
atomic.StoreInt32(&le.yield, int32(0))
le.yield.Store(int32(0))
le.yieldTimer.Stop()
}

Expand All @@ -436,14 +436,14 @@ func (le *leaderElectionSvcImpl) Yield() {
return
}
// Turn on the yield flag
atomic.StoreInt32(&le.yield, int32(1))
le.yield.Store(int32(1))
// Stop being a leader
le.stopBeingLeader()
// Clear the leader exists flag since it could be that we are the leader
atomic.StoreInt32(&le.leaderExists, int32(0))
le.leaderExists.Store(int32(0))
// Clear the yield flag in any case afterwards
le.yieldTimer = time.AfterFunc(le.config.LeaderAliveThreshold*6, func() {
atomic.StoreInt32(&le.yield, int32(0))
le.yield.Store(int32(0))
})
}

Expand Down
Loading