diff --git a/p2p/net/mock/mock_notif_test.go b/p2p/net/mock/mock_notif_test.go index 488e86d1bcd9c1eb579b3ecdd0157be15c30d2de..3521d6f869cb73ea7aa12d422b07c2f3e8c55ecd 100644 --- a/p2p/net/mock/mock_notif_test.go +++ b/p2p/net/mock/mock_notif_test.go @@ -11,8 +11,9 @@ import ( ) func TestNotifications(t *testing.T) { + const swarmSize = 5 - mn, err := FullMeshLinked(context.Background(), 5) + mn, err := FullMeshLinked(context.Background(), swarmSize) if err != nil { t.Fatal(err) } @@ -23,7 +24,7 @@ func TestNotifications(t *testing.T) { nets := mn.Nets() notifiees := make([]*netNotifiee, len(nets)) for i, pn := range nets { - n := newNetNotifiee() + n := newNetNotifiee(swarmSize) pn.Notify(n) notifiees[i] = n } @@ -193,14 +194,14 @@ type netNotifiee struct { closedStream chan inet.Stream } -func newNetNotifiee() *netNotifiee { +func newNetNotifiee(buffer int) *netNotifiee { return &netNotifiee{ - listen: make(chan ma.Multiaddr), - listenClose: make(chan ma.Multiaddr), - connected: make(chan inet.Conn), - disconnected: make(chan inet.Conn), - openedStream: make(chan inet.Stream), - closedStream: make(chan inet.Stream), + listen: make(chan ma.Multiaddr, buffer), + listenClose: make(chan ma.Multiaddr, buffer), + connected: make(chan inet.Conn, buffer), + disconnected: make(chan inet.Conn, buffer), + openedStream: make(chan inet.Stream, buffer), + closedStream: make(chan inet.Stream, buffer), } } diff --git a/p2p/net/mock/mock_peernet.go b/p2p/net/mock/mock_peernet.go index d0265b878b7324c179681e788988c071a695bc32..26d5a3bdc971241ff6f789ac6e3e79831ea1a631 100644 --- a/p2p/net/mock/mock_peernet.go +++ b/p2p/net/mock/mock_peernet.go @@ -31,7 +31,7 @@ type peernet struct { streamHandler inet.StreamHandler connHandler inet.ConnHandler - notifmu sync.RWMutex + notifmu sync.Mutex notifs map[inet.Notifiee]struct{} proc goprocess.Process @@ -381,11 +381,17 @@ func (pn *peernet) StopNotify(f inet.Notifiee) { // notifyAll runs the notification function on all Notifiees func (pn *peernet) notifyAll(notification func(f inet.Notifiee)) { - pn.notifmu.RLock() + pn.notifmu.Lock() + var wg sync.WaitGroup for n := range pn.notifs { // make sure we dont block // and they dont block each other. - go notification(n) + wg.Add(1) + go func(n inet.Notifiee) { + defer wg.Done() + notification(n) + }(n) } - pn.notifmu.RUnlock() + wg.Wait() + pn.notifmu.Unlock() }