Commit be2e7abf authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

p2p/net/swarm: dial once at a time

parent 23cf9d6f
...@@ -11,6 +11,49 @@ import ( ...@@ -11,6 +11,49 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
) )
func TestSimultDials(t *testing.T) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for i := 0; i < 10; i++ { // connect 10x for each.
wg.Add(2)
go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0])
go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0])
}
wg.Wait()
}
// should still just have 1, at most 2 connections :)
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
if c01l > 2 {
t.Error("0->1 has", c01l)
}
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
if c10l > 2 {
t.Error("1->0 has", c10l)
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpen(t *testing.T) { func TestSimultOpen(t *testing.T) {
// t.Skip("skipping for another test") // t.Skip("skipping for another test")
......
...@@ -4,6 +4,7 @@ package swarm ...@@ -4,6 +4,7 @@ package swarm
import ( import (
"fmt" "fmt"
"sync"
inet "github.com/jbenet/go-ipfs/p2p/net" inet "github.com/jbenet/go-ipfs/p2p/net"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
...@@ -33,6 +34,11 @@ type Swarm struct { ...@@ -33,6 +34,11 @@ type Swarm struct {
peers peer.Peerstore peers peer.Peerstore
connh ConnHandler connh ConnHandler
// dialing is a channel for the current peers being dialed.
// this way, we dont kick off N dials simultaneously.
dialing map[peer.ID]chan struct{}
dialingmu sync.Mutex
cg ctxgroup.ContextGroup cg ctxgroup.ContextGroup
} }
...@@ -49,10 +55,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, ...@@ -49,10 +55,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
} }
s := &Swarm{ s := &Swarm{
swarm: ps.NewSwarm(PSTransport), swarm: ps.NewSwarm(PSTransport),
local: local, local: local,
peers: peers, peers: peers,
cg: ctxgroup.WithContext(ctx), cg: ctxgroup.WithContext(ctx),
dialing: map[peer.ID]chan struct{}{},
} }
// configure Swarm // configure Swarm
......
...@@ -25,12 +25,41 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -25,12 +25,41 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, errors.New("Attempted connection to self!") return nil, errors.New("Attempted connection to self!")
} }
// check if we already have an open connection first for {
cs := s.ConnectionsToPeer(p) // check if we already have an open connection first
for _, c := range cs { cs := s.ConnectionsToPeer(p)
if c != nil { // dump out the first one we find for _, c := range cs {
return c, nil if c != nil { // dump out the first one we find
return c, nil
}
} }
// check if there's an ongoing dial to this peer
s.dialingmu.Lock()
dialDone, found := s.dialing[p]
if !found { // if not, set one up.
dialDone = make(chan struct{})
s.dialing[p] = dialDone
}
s.dialingmu.Unlock()
if found {
select {
case <-dialDone: // wait for that dial to finish.
continue // and see if it worked (loop). it may not have.
case <-ctx.Done():
return nil, ctx.Err()
}
}
// else, we're the ones dialing for others.
defer func() {
s.dialingmu.Lock()
delete(s.dialing, p)
close(dialDone)
s.dialingmu.Unlock()
}()
break
} }
sk := s.peers.PrivKey(s.local) sk := s.peers.PrivKey(s.local)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment