Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
adam.huang
go-libp2p
Commits
ebb51b28
Commit
ebb51b28
authored
Jan 13, 2015
by
Juan Batiz-Benet
Browse files
Merge pull request #551 from jbenet/fix-swarm-more-connect
p2p/net/swarm: more connection bugs
parents
aadb0bb3
0789599c
Changes
7
Hide whitespace changes
Inline
Side-by-side
net/swarm/dial_test.go
0 → 100644
View file @
ebb51b28
package
swarm
import
(
"net"
"os"
"sync"
"testing"
"time"
addrutil
"github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
peer
"github.com/jbenet/go-ipfs/p2p/peer"
testutil
"github.com/jbenet/go-ipfs/util/testutil"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
)
func
acceptAndHang
(
l
net
.
Listener
)
{
conns
:=
make
([]
net
.
Conn
,
0
,
10
)
for
{
c
,
err
:=
l
.
Accept
()
if
err
!=
nil
{
break
}
if
c
!=
nil
{
conns
=
append
(
conns
,
c
)
}
}
for
_
,
c
:=
range
conns
{
c
.
Close
()
}
}
func
TestSimultDials
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
t
.
Parallel
()
ctx
:=
context
.
Background
()
swarms
:=
makeSwarms
(
ctx
,
t
,
2
)
// connect everyone
{
var
wg
sync
.
WaitGroup
connect
:=
func
(
s
*
Swarm
,
dst
peer
.
ID
,
addr
ma
.
Multiaddr
)
{
// copy for other peer
log
.
Debugf
(
"TestSimultOpen: connecting: %s --> %s (%s)"
,
s
.
local
,
dst
,
addr
)
s
.
peers
.
AddAddress
(
dst
,
addr
)
if
_
,
err
:=
s
.
Dial
(
ctx
,
dst
);
err
!=
nil
{
t
.
Fatal
(
"error swarm dialing to peer"
,
err
)
}
wg
.
Done
()
}
ifaceAddrs0
,
err
:=
swarms
[
0
]
.
InterfaceListenAddresses
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
ifaceAddrs1
,
err
:=
swarms
[
1
]
.
InterfaceListenAddresses
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
log
.
Info
(
"Connecting swarms simultaneously."
)
for
i
:=
0
;
i
<
10
;
i
++
{
// connect 10x for each.
wg
.
Add
(
2
)
go
connect
(
swarms
[
0
],
swarms
[
1
]
.
local
,
ifaceAddrs1
[
0
])
go
connect
(
swarms
[
1
],
swarms
[
0
]
.
local
,
ifaceAddrs0
[
0
])
}
wg
.
Wait
()
}
// should still just have 1, at most 2 connections :)
c01l
:=
len
(
swarms
[
0
]
.
ConnectionsToPeer
(
swarms
[
1
]
.
local
))
if
c01l
>
2
{
t
.
Error
(
"0->1 has"
,
c01l
)
}
c10l
:=
len
(
swarms
[
1
]
.
ConnectionsToPeer
(
swarms
[
0
]
.
local
))
if
c10l
>
2
{
t
.
Error
(
"1->0 has"
,
c10l
)
}
for
_
,
s
:=
range
swarms
{
s
.
Close
()
}
}
func
newSilentPeer
(
t
*
testing
.
T
)
(
peer
.
ID
,
ma
.
Multiaddr
,
net
.
Listener
)
{
dst
:=
testutil
.
RandPeerIDFatal
(
t
)
lst
,
err
:=
net
.
Listen
(
"tcp"
,
":0"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
addr
,
err
:=
manet
.
FromNetAddr
(
lst
.
Addr
())
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
addrs
:=
[]
ma
.
Multiaddr
{
addr
}
addrs
,
err
=
addrutil
.
ResolveUnspecifiedAddresses
(
addrs
,
nil
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
t
.
Log
(
"new silent peer:"
,
dst
,
addrs
[
0
])
return
dst
,
addrs
[
0
],
lst
}
func
TestDialWait
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
t
.
Parallel
()
ctx
:=
context
.
Background
()
swarms
:=
makeSwarms
(
ctx
,
t
,
1
)
s1
:=
swarms
[
0
]
defer
s1
.
Close
()
s1
.
dialT
=
time
.
Millisecond
*
300
// lower timeout for tests.
if
os
.
Getenv
(
"TRAVIS"
)
==
"1"
{
s1
.
dialT
=
time
.
Second
}
// dial to a non-existent peer.
s2p
,
s2addr
,
s2l
:=
newSilentPeer
(
t
)
go
acceptAndHang
(
s2l
)
defer
s2l
.
Close
()
s1
.
peers
.
AddAddress
(
s2p
,
s2addr
)
before
:=
time
.
Now
()
if
c
,
err
:=
s1
.
Dial
(
ctx
,
s2p
);
err
==
nil
{
defer
c
.
Close
()
t
.
Fatal
(
"error swarm dialing to unknown peer worked..."
,
err
)
}
else
{
t
.
Log
(
"correctly got error:"
,
err
)
}
duration
:=
time
.
Now
()
.
Sub
(
before
)
dt
:=
s1
.
dialT
if
duration
<
dt
*
dialAttempts
{
t
.
Error
(
"< DialTimeout * dialAttempts not being respected"
,
duration
,
dt
*
dialAttempts
)
}
if
duration
>
2
*
dt
*
dialAttempts
{
t
.
Error
(
"> 2*DialTimeout * dialAttempts not being respected"
,
duration
,
2
*
dt
*
dialAttempts
)
}
if
!
s1
.
backf
.
Backoff
(
s2p
)
{
t
.
Error
(
"s2 should now be on backoff"
)
}
}
func
TestDialBackoff
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
if
os
.
Getenv
(
"TRAVIS"
)
==
"1"
{
t
.
Skip
(
"travis will never have fun with this test"
)
}
t
.
Parallel
()
ctx
:=
context
.
Background
()
swarms
:=
makeSwarms
(
ctx
,
t
,
2
)
s1
:=
swarms
[
0
]
s2
:=
swarms
[
1
]
defer
s1
.
Close
()
defer
s2
.
Close
()
s1
.
dialT
=
time
.
Millisecond
*
500
// lower timeout for tests.
s2
.
dialT
=
time
.
Millisecond
*
500
// lower timeout for tests.
s2addrs
,
err
:=
s2
.
InterfaceListenAddresses
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
s1
.
peers
.
AddAddresses
(
s2
.
local
,
s2addrs
)
// dial to a non-existent peer.
s3p
,
s3addr
,
s3l
:=
newSilentPeer
(
t
)
go
acceptAndHang
(
s3l
)
defer
s3l
.
Close
()
s1
.
peers
.
AddAddress
(
s3p
,
s3addr
)
// in this test we will:
// 1) dial 10x to each node.
// 2) all dials should hang
// 3) s1->s2 should succeed.
// 4) s1->s3 should not (and should place s3 on backoff)
// 5) disconnect entirely
// 6) dial 10x to each node again
// 7) s3 dials should all return immediately (except 1)
// 8) s2 dials should all hang, and succeed
// 9) last s3 dial ends, unsuccessful
dialOnlineNode
:=
func
(
dst
peer
.
ID
,
times
int
)
<-
chan
bool
{
ch
:=
make
(
chan
bool
)
for
i
:=
0
;
i
<
times
;
i
++
{
go
func
()
{
if
_
,
err
:=
s1
.
Dial
(
ctx
,
dst
);
err
!=
nil
{
t
.
Error
(
"error dialing"
,
dst
,
err
)
ch
<-
false
}
else
{
ch
<-
true
}
}()
}
return
ch
}
dialOfflineNode
:=
func
(
dst
peer
.
ID
,
times
int
)
<-
chan
bool
{
ch
:=
make
(
chan
bool
)
for
i
:=
0
;
i
<
times
;
i
++
{
go
func
()
{
if
c
,
err
:=
s1
.
Dial
(
ctx
,
dst
);
err
!=
nil
{
ch
<-
false
}
else
{
t
.
Error
(
"succeeded in dialing"
,
dst
)
ch
<-
true
c
.
Close
()
}
}()
}
return
ch
}
{
// 1) dial 10x to each node.
N
:=
10
s2done
:=
dialOnlineNode
(
s2
.
local
,
N
)
s3done
:=
dialOfflineNode
(
s3p
,
N
)
// when all dials should be done by:
dialTimeout1x
:=
time
.
After
(
s1
.
dialT
)
dialTimeout1Ax
:=
time
.
After
(
s1
.
dialT
*
dialAttempts
)
dialTimeout10Ax
:=
time
.
After
(
s1
.
dialT
*
dialAttempts
*
10
)
// 2) all dials should hang
select
{
case
<-
s2done
:
t
.
Error
(
"s2 should not happen immediately"
)
case
<-
s3done
:
t
.
Error
(
"s3 should not happen yet"
)
case
<-
time
.
After
(
time
.
Millisecond
)
:
// s2 may finish very quickly, so let's get out.
}
// 3) s1->s2 should succeed.
for
i
:=
0
;
i
<
N
;
i
++
{
select
{
case
r
:=
<-
s2done
:
if
!
r
{
t
.
Error
(
"s2 should not fail"
)
}
case
<-
s3done
:
t
.
Error
(
"s3 should not happen yet"
)
case
<-
dialTimeout1x
:
t
.
Error
(
"s2 took too long"
)
}
}
select
{
case
<-
s2done
:
t
.
Error
(
"s2 should have no more"
)
case
<-
s3done
:
t
.
Error
(
"s3 should not happen yet"
)
case
<-
dialTimeout1x
:
// let it pass
}
// 4) s1->s3 should not (and should place s3 on backoff)
// N-1 should finish before dialTimeout1Ax
for
i
:=
0
;
i
<
N
;
i
++
{
select
{
case
<-
s2done
:
t
.
Error
(
"s2 should have no more"
)
case
r
:=
<-
s3done
:
if
r
{
t
.
Error
(
"s3 should not succeed"
)
}
case
<-
dialTimeout1Ax
:
if
i
<
(
N
-
1
)
{
t
.
Fatal
(
"s3 took too long"
)
}
t
.
Log
(
"dialTimeout1Ax hit for last peer"
)
case
<-
dialTimeout10Ax
:
t
.
Fatal
(
"s3 took too long"
)
}
}
// check backoff state
if
s1
.
backf
.
Backoff
(
s2
.
local
)
{
t
.
Error
(
"s2 should not be on backoff"
)
}
if
!
s1
.
backf
.
Backoff
(
s3p
)
{
t
.
Error
(
"s3 should be on backoff"
)
}
// 5) disconnect entirely
for
_
,
c
:=
range
s1
.
Connections
()
{
c
.
Close
()
}
for
i
:=
0
;
i
<
100
&&
len
(
s1
.
Connections
())
>
0
;
i
++
{
<-
time
.
After
(
time
.
Millisecond
)
}
if
len
(
s1
.
Connections
())
>
0
{
t
.
Fatal
(
"s1 conns must exit"
)
}
}
{
// 6) dial 10x to each node again
N
:=
10
s2done
:=
dialOnlineNode
(
s2
.
local
,
N
)
s3done
:=
dialOfflineNode
(
s3p
,
N
)
// when all dials should be done by:
dialTimeout1x
:=
time
.
After
(
s1
.
dialT
)
dialTimeout1Ax
:=
time
.
After
(
s1
.
dialT
*
dialAttempts
)
dialTimeout10Ax
:=
time
.
After
(
s1
.
dialT
*
dialAttempts
*
10
)
// 7) s3 dials should all return immediately (except 1)
for
i
:=
0
;
i
<
N
-
1
;
i
++
{
select
{
case
<-
s2done
:
t
.
Error
(
"s2 should not succeed yet"
)
case
r
:=
<-
s3done
:
if
r
{
t
.
Error
(
"s3 should not succeed"
)
}
case
<-
dialTimeout1x
:
t
.
Fatal
(
"s3 took too long"
)
}
}
// 8) s2 dials should all hang, and succeed
for
i
:=
0
;
i
<
N
;
i
++
{
select
{
case
r
:=
<-
s2done
:
if
!
r
{
t
.
Error
(
"s2 should succeed"
)
}
// case <-s3done:
case
<-
dialTimeout1Ax
:
t
.
Fatal
(
"s3 took too long"
)
}
}
// 9) the last s3 should return, failed.
select
{
case
<-
s2done
:
t
.
Error
(
"s2 should have no more"
)
case
r
:=
<-
s3done
:
if
r
{
t
.
Error
(
"s3 should not succeed"
)
}
case
<-
dialTimeout10Ax
:
t
.
Fatal
(
"s3 took too long"
)
}
// check backoff state (the same)
if
s1
.
backf
.
Backoff
(
s2
.
local
)
{
t
.
Error
(
"s2 should not be on backoff"
)
}
if
!
s1
.
backf
.
Backoff
(
s3p
)
{
t
.
Error
(
"s3 should be on backoff"
)
}
}
}
func
TestDialBackoffClears
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
t
.
Parallel
()
ctx
:=
context
.
Background
()
swarms
:=
makeSwarms
(
ctx
,
t
,
2
)
s1
:=
swarms
[
0
]
s2
:=
swarms
[
1
]
defer
s1
.
Close
()
defer
s2
.
Close
()
s1
.
dialT
=
time
.
Millisecond
*
300
// lower timeout for tests.
s2
.
dialT
=
time
.
Millisecond
*
300
// lower timeout for tests.
if
os
.
Getenv
(
"TRAVIS"
)
==
"1"
{
s1
.
dialT
=
time
.
Second
s2
.
dialT
=
time
.
Second
}
// use another address first, that accept and hang on conns
_
,
s2bad
,
s2l
:=
newSilentPeer
(
t
)
go
acceptAndHang
(
s2l
)
defer
s2l
.
Close
()
// phase 1 -- dial to non-operational addresses
s1
.
peers
.
AddAddress
(
s2
.
local
,
s2bad
)
before
:=
time
.
Now
()
if
c
,
err
:=
s1
.
Dial
(
ctx
,
s2
.
local
);
err
==
nil
{
t
.
Fatal
(
"dialing to broken addr worked..."
,
err
)
defer
c
.
Close
()
}
else
{
t
.
Log
(
"correctly got error:"
,
err
)
}
duration
:=
time
.
Now
()
.
Sub
(
before
)
dt
:=
s1
.
dialT
if
duration
<
dt
*
dialAttempts
{
t
.
Error
(
"< DialTimeout * dialAttempts not being respected"
,
duration
,
dt
*
dialAttempts
)
}
if
duration
>
2
*
dt
*
dialAttempts
{
t
.
Error
(
"> 2*DialTimeout * dialAttempts not being respected"
,
duration
,
2
*
dt
*
dialAttempts
)
}
if
!
s1
.
backf
.
Backoff
(
s2
.
local
)
{
t
.
Error
(
"s2 should now be on backoff"
)
}
else
{
t
.
Log
(
"correctly added to backoff"
)
}
// phase 2 -- add the working address. dial should succeed.
ifaceAddrs1
,
err
:=
swarms
[
1
]
.
InterfaceListenAddresses
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
s1
.
peers
.
AddAddresses
(
s2
.
local
,
ifaceAddrs1
)
before
=
time
.
Now
()
if
c
,
err
:=
s1
.
Dial
(
ctx
,
s2
.
local
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
else
{
c
.
Close
()
t
.
Log
(
"correctly connected"
)
}
duration
=
time
.
Now
()
.
Sub
(
before
)
if
duration
>=
dt
{
// t.Error("took too long", duration, dt)
}
if
s1
.
backf
.
Backoff
(
s2
.
local
)
{
t
.
Error
(
"s2 should no longer be on backoff"
)
}
else
{
t
.
Log
(
"correctly cleared backoff"
)
}
}
net/swarm/simul_test.go
View file @
ebb51b28
...
...
@@ -11,51 +11,9 @@ import (
ma
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func
TestSimultDials
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
ctx
:=
context
.
Background
()
swarms
:=
makeSwarms
(
ctx
,
t
,
2
)
// connect everyone
{
var
wg
sync
.
WaitGroup
connect
:=
func
(
s
*
Swarm
,
dst
peer
.
ID
,
addr
ma
.
Multiaddr
)
{
// copy for other peer
log
.
Debugf
(
"TestSimultOpen: connecting: %s --> %s (%s)"
,
s
.
local
,
dst
,
addr
)
s
.
peers
.
AddAddress
(
dst
,
addr
)
if
_
,
err
:=
s
.
Dial
(
ctx
,
dst
);
err
!=
nil
{
t
.
Fatal
(
"error swarm dialing to peer"
,
err
)
}
wg
.
Done
()
}
log
.
Info
(
"Connecting swarms simultaneously."
)
for
i
:=
0
;
i
<
10
;
i
++
{
// connect 10x for each.
wg
.
Add
(
2
)
go
connect
(
swarms
[
0
],
swarms
[
1
]
.
local
,
swarms
[
1
]
.
ListenAddresses
()[
0
])
go
connect
(
swarms
[
1
],
swarms
[
0
]
.
local
,
swarms
[
0
]
.
ListenAddresses
()[
0
])
}
wg
.
Wait
()
}
// should still just have 1, at most 2 connections :)
c01l
:=
len
(
swarms
[
0
]
.
ConnectionsToPeer
(
swarms
[
1
]
.
local
))
if
c01l
>
2
{
t
.
Error
(
"0->1 has"
,
c01l
)
}
c10l
:=
len
(
swarms
[
1
]
.
ConnectionsToPeer
(
swarms
[
0
]
.
local
))
if
c10l
>
2
{
t
.
Error
(
"1->0 has"
,
c10l
)
}
for
_
,
s
:=
range
swarms
{
s
.
Close
()
}
}
func
TestSimultOpen
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
t
.
Parallel
()
ctx
:=
context
.
Background
()
swarms
:=
makeSwarms
(
ctx
,
t
,
2
)
...
...
@@ -87,6 +45,7 @@ func TestSimultOpen(t *testing.T) {
func
TestSimultOpenMany
(
t
*
testing
.
T
)
{
// t.Skip("very very slow")
t
.
Parallel
()
addrs
:=
20
SubtestSwarm
(
t
,
addrs
,
10
)
...
...
@@ -97,6 +56,7 @@ func TestSimultOpenFewStress(t *testing.T) {
t
.
SkipNow
()
}
// t.Skip("skipping for another test")
t
.
Parallel
()
msgs
:=
40
swarms
:=
2
...
...
net/swarm/swarm.go
View file @
ebb51b28
...
...
@@ -4,6 +4,7 @@ package swarm
import
(
"fmt"
"time"
inet
"github.com/jbenet/go-ipfs/p2p/net"
addrutil
"github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
...
...
@@ -32,7 +33,10 @@ type Swarm struct {
local
peer
.
ID
peers
peer
.
Peerstore
connh
ConnHandler
dsync
dialsync
backf
dialbackoff
dialT
time
.
Duration
// mainly for tests
cg
ctxgroup
.
ContextGroup
}
...
...
@@ -50,10 +54,11 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
}
s
:=
&
Swarm
{
swarm
:
ps
.
NewSwarm
(
PSTransport
),
local
:
local
,
peers
:
peers
,
cg
:
ctxgroup
.
WithContext
(
ctx
),
swarm
:
ps
.
NewSwarm
(
PSTransport
),
local
:
local
,
peers
:
peers
,
cg
:
ctxgroup
.
WithContext
(
ctx
),
dialT
:
DialTimeout
,
}
// configure Swarm
...
...
net/swarm/swarm_addr.go
View file @
ebb51b28
...
...
@@ -22,14 +22,14 @@ func (s *Swarm) ListenAddresses() []ma.Multiaddr {
// InterfaceListenAddresses returns a list of addresses at which this swarm
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func
InterfaceListenAddresses
(
s
*
Swarm
)
([]
ma
.
Multiaddr
,
error
)
{
func
(
s
*
Swarm
)
InterfaceListenAddresses
()
([]
ma
.
Multiaddr
,
error
)
{
return
addrutil
.
ResolveUnspecifiedAddresses
(
s
.
ListenAddresses
(),
nil
)
}
// checkNATWarning checks if our observed addresses differ. if so,
// informs the user that certain things might not work yet
func
checkNATWarning
(
s
*
Swarm
,
observed
ma
.
Multiaddr
,
expected
ma
.
Multiaddr
)
{
listen
,
err
:=
InterfaceListenAddresses
(
s
)
listen
,
err
:=
s
.
InterfaceListenAddresses
()
if
err
!=
nil
{
log
.
Errorf
(
"Error retrieving swarm.InterfaceListenAddresses: %s"
,
err
)
return
...
...
net/swarm/swarm_dial.go
View file @
ebb51b28
...
...
@@ -3,7 +3,9 @@ package swarm
import
(
"errors"
"fmt"
"net"
"sync"
"time"
conn
"github.com/jbenet/go-ipfs/p2p/net/conn"
addrutil
"github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
...
...
@@ -12,11 +14,28 @@ import (
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
)
// Diagram of dial sync:
//
// many callers of Dial() synched w. dials many addrs results to callers
// ----------------------\ dialsync use earliest /--------------
// -----------------------\ |----------\ /----------------
// ------------------------>------------<------- >---------<-----------------
// -----------------------| \----x \----------------
// ----------------------| \-----x \---------------
// any may fail if no addr at end
// retry dialAttempt x
// dialAttempts governs how many times a goroutine will try to dial a given peer.
const
dialAttempts
=
3
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
var
DialTimeout
time
.
Duration
=
time
.
Second
*
10
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
...
...
@@ -88,6 +107,71 @@ func (ds *dialsync) Unlock(dst peer.ID) {
ds
.
lock
.
Unlock
()
}
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
// for {
// if ok, wait := dialsync.Lock(p); !ok {
// if backoff.Backoff(p) {
// return errDialFailed
// }
// <-wait
// continue
// }
// defer dialsync.Unlock(p)
// c, err := actuallyDial(p)
// if err != nil {
// dialbackoff.AddBackoff(p)
// continue
// }
// dialbackoff.Clear(p)
// }
//
type
dialbackoff
struct
{
entries
map
[
peer
.
ID
]
struct
{}
lock
sync
.
RWMutex
}
func
(
db
*
dialbackoff
)
init
()
{
if
db
.
entries
==
nil
{
db
.
entries
=
make
(
map
[
peer
.
ID
]
struct
{})
}
}
// Backoff returns whether the client should backoff from dialing
// peeer p
func
(
db
*
dialbackoff
)
Backoff
(
p
peer
.
ID
)
bool
{
db
.
lock
.
Lock
()
db
.
init
()
_
,
found
:=
db
.
entries
[
p
]
db
.
lock
.
Unlock
()
return
found
}
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
func
(
db
*
dialbackoff
)
AddBackoff
(
p
peer
.
ID
)
{
db
.
lock
.
Lock
()
db
.
init
()
db
.
entries
[
p
]
=
struct
{}{}
db
.
lock
.
Unlock
()
}
// Clear removes a backoff record. Clients should call this after a
// successful Dial.
func
(
db
*
dialbackoff
)
Clear
(
p
peer
.
ID
)
{
db
.
lock
.
Lock
()
db
.
init
()
delete
(
db
.
entries
,
p
)
db
.
lock
.
Unlock
()
}
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
...
...
@@ -95,6 +179,7 @@ func (ds *dialsync) Unlock(dst peer.ID) {
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func
(
s
*
Swarm
)
Dial
(
ctx
context
.
Context
,
p
peer
.
ID
)
(
*
Conn
,
error
)
{
log
:=
log
.
Prefix
(
"swarm %s dialing %s"
,
s
.
local
,
p
)
if
p
==
s
.
local
{
return
nil
,
errors
.
New
(
"Attempted connection to self!"
)
}
...
...
@@ -118,6 +203,13 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
// check if there's an ongoing dial to this peer
if
ok
,
wait
:=
s
.
dsync
.
Lock
(
p
);
!
ok
{
if
s
.
backf
.
Backoff
(
p
)
{
log
.
Debugf
(
"backoff"
)
return
nil
,
fmt
.
Errorf
(
"%s failed to dial %s, backing off."
,
s
.
local
,
p
)
}
log
.
Debugf
(
"waiting for ongoing dial"
)
select
{
case
<-
wait
:
// wait for that dial to finish.
continue
// and see if it worked (loop), OR we got an incoming dial.
...
...
@@ -128,13 +220,22 @@ func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
// ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
conn
,
err
=
s
.
dial
(
ctx
,
p
)
log
.
Debugf
(
"dial start"
)
ctxT
,
_
:=
context
.
WithTimeout
(
ctx
,
s
.
dialT
)
conn
,
err
=
s
.
dial
(
ctxT
,
p
)
s
.
dsync
.
Unlock
(
p
)
log
.
Debugf
(
"dial end %s"
,
conn
)
if
err
!=
nil
{
s
.
backf
.
AddBackoff
(
p
)
// let others know to backoff
continue
// ok, we failed. try again. (if loop is done, our error is output)
}
s
.
backf
.
Clear
(
p
)
// okay, no longer need to backoff
return
conn
,
nil
}
if
err
==
nil
{
err
=
fmt
.
Errorf
(
"%s failed to dial %s after %d attempts"
,
s
.
local
,
p
,
dialAttempts
)
}
return
nil
,
err
}
...
...
@@ -162,13 +263,21 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
remoteAddrs
=
addrutil
.
FilterUsableAddrs
(
remoteAddrs
)
// drop out any addrs that would just dial ourselves. use ListenAddresses
// as that is a more authoritative view than localAddrs.
remoteAddrs
=
addrutil
.
Subtract
(
remoteAddrs
,
s
.
ListenAddresses
())
ila
,
_
:=
s
.
InterfaceListenAddresses
()
remoteAddrs
=
addrutil
.
Subtract
(
remoteAddrs
,
ila
)
remoteAddrs
=
addrutil
.
Subtract
(
remoteAddrs
,
s
.
peers
.
Addresses
(
s
.
local
))
log
.
Debugf
(
"%s swarm dialing %s -- remote:%s local:%s"
,
s
.
local
,
p
,
remoteAddrs
,
s
.
ListenAddresses
())
if
len
(
remoteAddrs
)
==
0
{
return
nil
,
errors
.
New
(
"peer has no addresses"
)
}
// open connection to peer
d
:=
&
conn
.
Dialer
{
Dialer
:
manet
.
Dialer
{
Dialer
:
net
.
Dialer
{
Timeout
:
s
.
dialT
,
},
},
LocalPeer
:
s
.
local
,
LocalAddrs
:
localAddrs
,
PrivateKey
:
sk
,
...
...
@@ -196,34 +305,83 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
func
(
s
*
Swarm
)
dialAddrs
(
ctx
context
.
Context
,
d
*
conn
.
Dialer
,
p
peer
.
ID
,
remoteAddrs
[]
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
{
// try to connect to one of the peer's known addresses.
// for simplicity, we do this sequentially.
// A future commit will do this asynchronously.
// we dial concurrently to each of the addresses, which:
// * makes the process faster overall
// * attempts to get the fastest connection available.
// * mitigates the waste of trying bad addresses
log
.
Debugf
(
"%s swarm dialing %s %s"
,
s
.
local
,
p
,
remoteAddrs
)
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
// cancel work when we exit func
foundConn
:=
make
(
chan
struct
{})
conns
:=
make
(
chan
conn
.
Conn
,
len
(
remoteAddrs
))
errs
:=
make
(
chan
error
,
len
(
remoteAddrs
))
//TODO: rate limiting just in case?
for
_
,
addr
:=
range
remoteAddrs
{
connC
,
err
:=
d
.
Dial
(
ctx
,
addr
,
p
)
if
err
!=
nil
{
continue
}
go
func
(
addr
ma
.
Multiaddr
)
{
connC
,
err
:=
s
.
dialAddr
(
ctx
,
d
,
p
,
addr
)
// if the connection is not to whom we thought it would be...
if
connC
.
RemotePeer
()
!=
p
{
log
.
Infof
(
"misdial to %s through %s (got %s)"
,
p
,
addr
,
connC
.
RemoteMultiaddr
())
connC
.
Close
()
continue
}
// check parent still wants our results
select
{
case
<-
foundConn
:
if
connC
!=
nil
{
connC
.
Close
()
}
return
default
:
}
if
err
!=
nil
{
errs
<-
err
}
else
if
connC
==
nil
{
errs
<-
fmt
.
Errorf
(
"failed to dial %s %s"
,
p
,
addr
)
}
else
{
conns
<-
connC
}
}(
addr
)
}
// if the connection is to ourselves...
// this can happen TONS when Loopback addrs are advertized.
// (this should be caught by two checks above, but let's just make sure.)
if
connC
.
RemotePeer
()
==
s
.
local
{
log
.
Infof
(
"misdial to %s through %s"
,
p
,
addr
)
connC
.
Close
()
continue
err
:=
fmt
.
Errorf
(
"failed to dial %s"
,
p
)
for
i
:=
0
;
i
<
len
(
remoteAddrs
);
i
++
{
select
{
case
err
=
<-
errs
:
log
.
Info
(
err
)
case
connC
:=
<-
conns
:
// take the first + return asap
close
(
foundConn
)
return
connC
,
nil
}
}
return
nil
,
err
}
func
(
s
*
Swarm
)
dialAddr
(
ctx
context
.
Context
,
d
*
conn
.
Dialer
,
p
peer
.
ID
,
addr
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
{
log
.
Debugf
(
"%s swarm dialing %s %s"
,
s
.
local
,
p
,
addr
)
// success! we got one!
return
connC
,
nil
connC
,
err
:=
d
.
Dial
(
ctx
,
addr
,
p
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"%s --> %s dial attempt failed: %s"
,
s
.
local
,
p
,
err
)
}
// if the connection is not to whom we thought it would be...
remotep
:=
connC
.
RemotePeer
()
if
remotep
!=
p
{
connC
.
Close
()
return
nil
,
fmt
.
Errorf
(
"misdial to %s through %s (got %s)"
,
p
,
addr
,
remotep
)
}
return
nil
,
fmt
.
Errorf
(
"failed to dial %s"
,
p
)
// if the connection is to ourselves...
// this can happen TONS when Loopback addrs are advertized.
// (this should be caught by two checks above, but let's just make sure.)
if
remotep
==
s
.
local
{
connC
.
Close
()
return
nil
,
fmt
.
Errorf
(
"misdial to %s through %s (got self)"
,
p
,
addr
)
}
// success! we got one!
return
connC
,
nil
}
// dialConnSetup is the setup logic for a connection from the dial side. it
...
...
net/swarm/swarm_net.go
View file @
ebb51b28
...
...
@@ -111,7 +111,7 @@ func (n *Network) ListenAddresses() []ma.Multiaddr {
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func
(
n
*
Network
)
InterfaceListenAddresses
()
([]
ma
.
Multiaddr
,
error
)
{
return
InterfaceListenAddresses
(
n
.
Swarm
()
)
return
n
.
Swarm
()
.
InterfaceListenAddresses
()
}
// Connectedness returns a state signaling connection capabilities
...
...
net/swarm/swarm_test.go
View file @
ebb51b28
...
...
@@ -227,6 +227,7 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
func
TestSwarm
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
t
.
Parallel
()
// msgs := 1000
msgs
:=
100
...
...
@@ -236,6 +237,7 @@ func TestSwarm(t *testing.T) {
func
TestConnHandler
(
t
*
testing
.
T
)
{
// t.Skip("skipping for another test")
t
.
Parallel
()
ctx
:=
context
.
Background
()
swarms
:=
makeSwarms
(
ctx
,
t
,
5
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment