Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
adam.huang
go-libp2p
Commits
d899b07f
Commit
d899b07f
authored
Apr 14, 2016
by
Jeromy
Committed by
Jeromy
May 31, 2016
Browse files
Refactor the swarm dialer
parent
df1c7738
Changes
8
Show whitespace changes
Inline
Side-by-side
p2p/net/swarm/addr/addr.go
View file @
d899b07f
...
...
@@ -42,10 +42,14 @@ func init() {
// FilterAddrs is a filter that removes certain addresses, according to filter.
// if filter returns true, the address is kept.
func
FilterAddrs
(
a
[]
ma
.
Multiaddr
,
filter
func
(
ma
.
Multiaddr
)
bool
)
[]
ma
.
Multiaddr
{
func
FilterAddrs
(
a
[]
ma
.
Multiaddr
,
filter
s
...
func
(
ma
.
Multiaddr
)
bool
)
[]
ma
.
Multiaddr
{
b
:=
make
([]
ma
.
Multiaddr
,
0
,
len
(
a
))
for
_
,
addr
:=
range
a
{
if
filter
(
addr
)
{
good
:=
true
for
_
,
filter
:=
range
filters
{
good
=
good
&&
filter
(
addr
)
}
if
good
{
b
=
append
(
b
,
addr
)
}
}
...
...
@@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd
// from a list. the addresses removed are those known NOT
// to work with our network. Namely, addresses with UTP.
func
FilterUsableAddrs
(
a
[]
ma
.
Multiaddr
)
[]
ma
.
Multiaddr
{
return
FilterAddrs
(
a
,
func
(
m
ma
.
Multiaddr
)
bool
{
return
FilterAddrs
(
a
,
AddrUsableFunc
)
}
func
AddrUsableFunc
(
m
ma
.
Multiaddr
)
bool
{
return
AddrUsable
(
m
,
false
)
})
}
// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
...
...
p2p/net/swarm/addr/filter.go
0 → 100644
View file @
d899b07f
package
addrutil
import
(
ma
"github.com/jbenet/go-multiaddr"
mafmt
"github.com/whyrusleeping/mafmt"
)
func
SubtractFilter
(
addrs
...
ma
.
Multiaddr
)
func
(
ma
.
Multiaddr
)
bool
{
addrmap
:=
make
(
map
[
string
]
bool
)
for
_
,
a
:=
range
addrs
{
addrmap
[
string
(
a
.
Bytes
())]
=
true
}
return
func
(
a
ma
.
Multiaddr
)
bool
{
return
!
addrmap
[
string
(
a
.
Bytes
())]
}
}
func
IsFDCostlyTransport
(
a
ma
.
Multiaddr
)
bool
{
return
mafmt
.
TCP
.
Matches
(
a
)
}
func
FilterNeg
(
f
func
(
ma
.
Multiaddr
)
bool
)
func
(
ma
.
Multiaddr
)
bool
{
return
func
(
a
ma
.
Multiaddr
)
bool
{
return
!
f
(
a
)
}
}
p2p/net/swarm/dial_test.go
View file @
d899b07f
...
...
@@ -2,7 +2,6 @@ package swarm
import
(
"net"
"sort"
"sync"
"testing"
"time"
...
...
@@ -493,38 +492,3 @@ func TestDialBackoffClears(t *testing.T) {
t
.
Log
(
"correctly cleared backoff"
)
}
}
func
mkAddr
(
t
*
testing
.
T
,
s
string
)
ma
.
Multiaddr
{
a
,
err
:=
ma
.
NewMultiaddr
(
s
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
return
a
}
func
TestAddressSorting
(
t
*
testing
.
T
)
{
u1
:=
mkAddr
(
t
,
"/ip4/152.12.23.53/udp/1234/utp"
)
u2l
:=
mkAddr
(
t
,
"/ip4/127.0.0.1/udp/1234/utp"
)
local
:=
mkAddr
(
t
,
"/ip4/127.0.0.1/tcp/1234"
)
norm
:=
mkAddr
(
t
,
"/ip4/6.5.4.3/tcp/1234"
)
l
:=
AddrList
{
local
,
u1
,
u2l
,
norm
}
sort
.
Sort
(
l
)
if
!
l
[
0
]
.
Equal
(
u2l
)
{
t
.
Fatal
(
"expected utp local addr to be sorted first: "
,
l
[
0
])
}
if
!
l
[
1
]
.
Equal
(
u1
)
{
t
.
Fatal
(
"expected utp addr to be sorted second"
)
}
if
!
l
[
2
]
.
Equal
(
local
)
{
t
.
Fatal
(
"expected tcp localhost addr thid"
)
}
if
!
l
[
3
]
.
Equal
(
norm
)
{
t
.
Fatal
(
"expected normal addr last"
)
}
}
p2p/net/swarm/limiter.go
0 → 100644
View file @
d899b07f
package
swarm
import
(
"sync"
peer
"github.com/ipfs/go-libp2p-peer"
ma
"github.com/jbenet/go-multiaddr"
context
"golang.org/x/net/context"
conn
"github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil
"github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
)
type
dialResult
struct
{
Conn
conn
.
Conn
Err
error
}
type
dialJob
struct
{
addr
ma
.
Multiaddr
peer
peer
.
ID
ctx
context
.
Context
resp
chan
dialResult
success
bool
}
type
dialLimiter
struct
{
rllock
sync
.
Mutex
fdConsuming
int
fdLimit
int
waitingOnFd
[]
*
dialJob
dialFunc
func
(
context
.
Context
,
peer
.
ID
,
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
activePerPeer
map
[
peer
.
ID
]
int
perPeerLimit
int
waitingOnPeerLimit
map
[
peer
.
ID
][]
*
dialJob
}
type
dialfunc
func
(
context
.
Context
,
peer
.
ID
,
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
func
newDialLimiter
(
df
dialfunc
)
*
dialLimiter
{
return
newDialLimiterWithParams
(
df
,
concurrentFdDials
,
defaultPerPeerRateLimit
)
}
func
newDialLimiterWithParams
(
df
dialfunc
,
fdl
,
ppl
int
)
*
dialLimiter
{
return
&
dialLimiter
{
fdLimit
:
fdl
,
perPeerLimit
:
ppl
,
waitingOnPeerLimit
:
make
(
map
[
peer
.
ID
][]
*
dialJob
),
activePerPeer
:
make
(
map
[
peer
.
ID
]
int
),
dialFunc
:
df
,
}
}
func
(
dl
*
dialLimiter
)
finishedDial
(
dj
*
dialJob
)
{
dl
.
rllock
.
Lock
()
defer
dl
.
rllock
.
Unlock
()
// release tokens in reverse order than we take them
dl
.
activePerPeer
[
dj
.
peer
]
--
if
dl
.
activePerPeer
[
dj
.
peer
]
==
0
{
delete
(
dl
.
activePerPeer
,
dj
.
peer
)
}
waitlist
:=
dl
.
waitingOnPeerLimit
[
dj
.
peer
]
if
!
dj
.
success
&&
len
(
waitlist
)
>
0
{
next
:=
waitlist
[
0
]
if
len
(
waitlist
)
==
1
{
delete
(
dl
.
waitingOnPeerLimit
,
dj
.
peer
)
}
else
{
dl
.
waitingOnPeerLimit
[
dj
.
peer
]
=
waitlist
[
1
:
]
}
dl
.
activePerPeer
[
dj
.
peer
]
++
// just kidding, we still want this token
// can kick this off right here, dials in this list already
// have the other tokens needed
go
dl
.
executeDial
(
next
)
}
if
addrutil
.
IsFDCostlyTransport
(
dj
.
addr
)
{
dl
.
fdConsuming
--
if
len
(
dl
.
waitingOnFd
)
>
0
{
next
:=
dl
.
waitingOnFd
[
0
]
dl
.
waitingOnFd
=
dl
.
waitingOnFd
[
1
:
]
dl
.
fdConsuming
++
// now, attempt to take the 'per peer limit' token
dl
.
schedulePerPeerDial
(
next
)
}
}
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func
(
dl
*
dialLimiter
)
AddDialJob
(
dj
*
dialJob
)
{
dl
.
rllock
.
Lock
()
defer
dl
.
rllock
.
Unlock
()
if
addrutil
.
IsFDCostlyTransport
(
dj
.
addr
)
{
if
dl
.
fdConsuming
>=
dl
.
fdLimit
{
dl
.
waitingOnFd
=
append
(
dl
.
waitingOnFd
,
dj
)
return
}
// take token
dl
.
fdConsuming
++
}
dl
.
schedulePerPeerDial
(
dj
)
}
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func
(
dl
*
dialLimiter
)
executeDial
(
j
*
dialJob
)
{
defer
dl
.
finishedDial
(
j
)
con
,
err
:=
dl
.
dialFunc
(
j
.
ctx
,
j
.
peer
,
j
.
addr
)
select
{
case
j
.
resp
<-
dialResult
{
Conn
:
con
,
Err
:
err
}
:
case
<-
j
.
ctx
.
Done
()
:
}
}
func
(
dl
*
dialLimiter
)
schedulePerPeerDial
(
j
*
dialJob
)
{
if
dl
.
activePerPeer
[
j
.
peer
]
>=
dl
.
perPeerLimit
{
wlist
:=
dl
.
waitingOnPeerLimit
[
j
.
peer
]
dl
.
waitingOnPeerLimit
[
j
.
peer
]
=
append
(
wlist
,
j
)
return
}
// take second needed token and start dial!
dl
.
activePerPeer
[
j
.
peer
]
++
go
dl
.
executeDial
(
j
)
}
p2p/net/swarm/limiter_test.go
0 → 100644
View file @
d899b07f
package
swarm
import
(
"fmt"
"strconv"
"testing"
"time"
peer
"github.com/ipfs/go-libp2p-peer"
ma
"github.com/jbenet/go-multiaddr"
mafmt
"github.com/whyrusleeping/mafmt"
context
"golang.org/x/net/context"
conn
"github.com/ipfs/go-libp2p/p2p/net/conn"
)
func
mustAddr
(
t
*
testing
.
T
,
s
string
)
ma
.
Multiaddr
{
a
,
err
:=
ma
.
NewMultiaddr
(
s
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
return
a
}
func
addrWithPort
(
t
*
testing
.
T
,
p
int
)
ma
.
Multiaddr
{
return
mustAddr
(
t
,
fmt
.
Sprintf
(
"/ip4/127.0.0.1/tcp/%d"
,
p
))
}
// in these tests I use addresses with tcp ports over a certain number to
// signify 'good' addresses that will succeed, and addresses below that number
// will fail. This lets us more easily test these different scenarios.
func
tcpPortOver
(
a
ma
.
Multiaddr
,
n
int
)
bool
{
port
,
err
:=
a
.
ValueForProtocol
(
ma
.
P_TCP
)
if
err
!=
nil
{
panic
(
err
)
}
pnum
,
err
:=
strconv
.
Atoi
(
port
)
if
err
!=
nil
{
panic
(
err
)
}
return
pnum
>
n
}
func
tryDialAddrs
(
ctx
context
.
Context
,
l
*
dialLimiter
,
p
peer
.
ID
,
addrs
[]
ma
.
Multiaddr
,
res
chan
dialResult
)
{
for
_
,
a
:=
range
addrs
{
l
.
AddDialJob
(
&
dialJob
{
ctx
:
ctx
,
peer
:
p
,
addr
:
a
,
resp
:
res
,
})
}
}
func
hangDialFunc
(
hang
chan
struct
{})
dialfunc
{
return
func
(
ctx
context
.
Context
,
p
peer
.
ID
,
a
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
{
if
mafmt
.
UTP
.
Matches
(
a
)
{
return
conn
.
Conn
(
nil
),
nil
}
if
tcpPortOver
(
a
,
10
)
{
return
conn
.
Conn
(
nil
),
nil
}
else
{
<-
hang
return
nil
,
fmt
.
Errorf
(
"test bad dial"
)
}
}
}
func
TestLimiterBasicDials
(
t
*
testing
.
T
)
{
hang
:=
make
(
chan
struct
{})
defer
close
(
hang
)
l
:=
newDialLimiterWithParams
(
hangDialFunc
(
hang
),
concurrentFdDials
,
4
)
bads
:=
[]
ma
.
Multiaddr
{
addrWithPort
(
t
,
1
),
addrWithPort
(
t
,
2
),
addrWithPort
(
t
,
3
),
addrWithPort
(
t
,
4
),
}
good
:=
addrWithPort
(
t
,
20
)
resch
:=
make
(
chan
dialResult
)
pid
:=
peer
.
ID
(
"testpeer"
)
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
tryDialAddrs
(
ctx
,
l
,
pid
,
bads
,
resch
)
l
.
AddDialJob
(
&
dialJob
{
ctx
:
ctx
,
peer
:
pid
,
addr
:
good
,
resp
:
resch
,
})
select
{
case
<-
resch
:
t
.
Fatal
(
"no dials should have completed!"
)
case
<-
time
.
After
(
time
.
Millisecond
*
100
)
:
}
// complete a single hung dial
hang
<-
struct
{}{}
select
{
case
r
:=
<-
resch
:
if
r
.
Err
==
nil
{
t
.
Fatal
(
"should have gotten failed dial result"
)
}
case
<-
time
.
After
(
time
.
Second
)
:
t
.
Fatal
(
"timed out waiting for dial completion"
)
}
select
{
case
r
:=
<-
resch
:
if
r
.
Err
!=
nil
{
t
.
Fatal
(
"expected second result to be success!"
)
}
case
<-
time
.
After
(
time
.
Second
)
:
}
}
func
TestFDLimiting
(
t
*
testing
.
T
)
{
hang
:=
make
(
chan
struct
{})
defer
close
(
hang
)
l
:=
newDialLimiterWithParams
(
hangDialFunc
(
hang
),
16
,
5
)
bads
:=
[]
ma
.
Multiaddr
{
addrWithPort
(
t
,
1
),
addrWithPort
(
t
,
2
),
addrWithPort
(
t
,
3
),
addrWithPort
(
t
,
4
)}
pids
:=
[]
peer
.
ID
{
"testpeer1"
,
"testpeer2"
,
"testpeer3"
,
"testpeer4"
}
good_tcp
:=
addrWithPort
(
t
,
20
)
ctx
:=
context
.
Background
()
resch
:=
make
(
chan
dialResult
)
// take all fd limit tokens with hang dials
for
_
,
pid
:=
range
pids
{
tryDialAddrs
(
ctx
,
l
,
pid
,
bads
,
resch
)
}
// these dials should work normally, but will hang because we have taken
// up all the fd limiting
for
_
,
pid
:=
range
pids
{
l
.
AddDialJob
(
&
dialJob
{
ctx
:
ctx
,
peer
:
pid
,
addr
:
good_tcp
,
resp
:
resch
,
})
}
select
{
case
<-
resch
:
t
.
Fatal
(
"no dials should have completed!"
)
case
<-
time
.
After
(
time
.
Millisecond
*
100
)
:
}
pid5
:=
peer
.
ID
(
"testpeer5"
)
utpaddr
:=
mustAddr
(
t
,
"/ip4/127.0.0.1/udp/7777/utp"
)
l
.
AddDialJob
(
&
dialJob
{
ctx
:
ctx
,
peer
:
pid5
,
addr
:
utpaddr
,
resp
:
resch
})
select
{
case
res
:=
<-
resch
:
if
res
.
Err
!=
nil
{
t
.
Fatal
(
"should have gotten successful response"
)
}
case
<-
time
.
After
(
time
.
Second
*
5
)
:
t
.
Fatal
(
"timeout waiting for utp addr success"
)
}
}
func
TestTokenRedistribution
(
t
*
testing
.
T
)
{
hangchs
:=
make
(
map
[
peer
.
ID
]
chan
struct
{})
df
:=
func
(
ctx
context
.
Context
,
p
peer
.
ID
,
a
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
{
if
tcpPortOver
(
a
,
10
)
{
return
(
conn
.
Conn
)(
nil
),
nil
}
else
{
<-
hangchs
[
p
]
return
nil
,
fmt
.
Errorf
(
"test bad dial"
)
}
}
l
:=
newDialLimiterWithParams
(
df
,
8
,
4
)
bads
:=
[]
ma
.
Multiaddr
{
addrWithPort
(
t
,
1
),
addrWithPort
(
t
,
2
),
addrWithPort
(
t
,
3
),
addrWithPort
(
t
,
4
)}
pids
:=
[]
peer
.
ID
{
"testpeer1"
,
"testpeer2"
}
ctx
:=
context
.
Background
()
resch
:=
make
(
chan
dialResult
)
// take all fd limit tokens with hang dials
for
_
,
pid
:=
range
pids
{
hangchs
[
pid
]
=
make
(
chan
struct
{})
tryDialAddrs
(
ctx
,
l
,
pid
,
bads
,
resch
)
}
good
:=
mustAddr
(
t
,
"/ip4/127.0.0.1/tcp/1001"
)
// add a good dial job for peer 1
l
.
AddDialJob
(
&
dialJob
{
ctx
:
ctx
,
peer
:
pids
[
1
],
addr
:
good
,
resp
:
resch
,
})
select
{
case
<-
resch
:
t
.
Fatal
(
"no dials should have completed!"
)
case
<-
time
.
After
(
time
.
Millisecond
*
100
)
:
}
// unblock one dial for peer 0
hangchs
[
pids
[
0
]]
<-
struct
{}{}
select
{
case
res
:=
<-
resch
:
if
res
.
Err
==
nil
{
t
.
Fatal
(
"should have only been a failure here"
)
}
case
<-
time
.
After
(
time
.
Millisecond
*
100
)
:
t
.
Fatal
(
"expected a dial failure here"
)
}
select
{
case
<-
resch
:
t
.
Fatal
(
"no more dials should have completed!"
)
case
<-
time
.
After
(
time
.
Millisecond
*
100
)
:
}
// add a bad dial job to peer 0 to fill their rate limiter
// and test that more dials for this peer won't interfere with peer 1's successful dial incoming
l
.
AddDialJob
(
&
dialJob
{
ctx
:
ctx
,
peer
:
pids
[
0
],
addr
:
addrWithPort
(
t
,
7
),
resp
:
resch
,
})
hangchs
[
pids
[
1
]]
<-
struct
{}{}
// now one failed dial from peer 1 should get through and fail
// which will in turn unblock the successful dial on peer 1
select
{
case
res
:=
<-
resch
:
if
res
.
Err
==
nil
{
t
.
Fatal
(
"should have only been a failure here"
)
}
case
<-
time
.
After
(
time
.
Millisecond
*
100
)
:
t
.
Fatal
(
"expected a dial failure here"
)
}
select
{
case
res
:=
<-
resch
:
if
res
.
Err
!=
nil
{
t
.
Fatal
(
"should have succeeded!"
)
}
case
<-
time
.
After
(
time
.
Millisecond
*
100
)
:
t
.
Fatal
(
"should have gotten successful dial"
)
}
}
p2p/net/swarm/swarm.go
View file @
d899b07f
...
...
@@ -90,6 +90,8 @@ type Swarm struct {
proc
goprocess
.
Process
ctx
context
.
Context
bwc
metrics
.
Reporter
limiter
*
dialLimiter
}
// NewSwarm constructs a Swarm, with a Chan.
...
...
@@ -122,6 +124,8 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialer
:
conn
.
NewDialer
(
local
,
peers
.
PrivKey
(
local
),
wrap
),
}
s
.
limiter
=
newDialLimiter
(
s
.
dialAddr
)
// configure Swarm
s
.
proc
=
goprocessctx
.
WithContextAndTeardown
(
ctx
,
s
.
teardown
)
s
.
SetConnHandler
(
nil
)
// make sure to setup our own conn handler.
...
...
@@ -155,6 +159,7 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
}
listenAddrs
=
filtered
}
return
listenAddrs
,
nil
}
...
...
p2p/net/swarm/swarm_dial.go
View file @
d899b07f
package
swarm
import
(
"bytes"
"errors"
"fmt"
"sort"
"sync"
"time"
...
...
@@ -13,7 +11,6 @@ import (
conn
"github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil
"github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
ma
"github.com/jbenet/go-multiaddr"
"github.com/jbenet/go-multiaddr-net"
context
"golang.org/x/net/context"
)
...
...
@@ -42,6 +39,9 @@ const dialAttempts = 1
// number of concurrent outbound dials over transports that consume file descriptors
const
concurrentFdDials
=
160
// number of concurrent outbound dials to make per peer
const
defaultPerPeerRateLimit
=
8
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
...
...
@@ -319,32 +319,34 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
log
.
Debug
(
"Dial not given PrivateKey, so WILL NOT SECURE conn."
)
}
// get remote peer addrs
remoteAddrs
:=
s
.
peers
.
Addrs
(
p
)
// make sure we can use the addresses.
remoteAddrs
=
addrutil
.
FilterUsableAddrs
(
remoteAddrs
)
// drop out any addrs that would just dial ourselves. use ListenAddresses
// as that is a more authoritative view than localAddrs.
ila
,
_
:=
s
.
InterfaceListenAddresses
()
remoteAddrs
=
addrutil
.
Subtract
(
remoteAddrs
,
ila
)
remoteAddrs
=
addrutil
.
Subtract
(
remoteAddrs
,
s
.
peers
.
Addrs
(
s
.
local
))
log
.
Debugf
(
"%s swarm dialing %s -- local:%s remote:%s"
,
s
.
local
,
p
,
s
.
ListenAddresses
(),
remoteAddrs
)
if
len
(
remoteAddrs
)
==
0
{
err
:=
errors
.
New
(
"peer has no addresses"
)
logdial
[
"error"
]
=
err
return
nil
,
err
}
remoteAddrs
=
s
.
filterAddrs
(
remoteAddrs
)
if
len
(
remoteAddrs
)
==
0
{
err
:=
errors
.
New
(
"all adresses for peer have been filtered out"
)
logdial
[
"error"
]
=
err
return
nil
,
err
}
subtract_filter
:=
addrutil
.
SubtractFilter
(
append
(
ila
,
s
.
peers
.
Addrs
(
s
.
local
)
...
)
...
)
// get live channel of addresses for peer, filtered by the given filters
/*
remoteAddrChan := s.peers.AddrsChan(ctx, p,
addrutil.AddrUsableFilter,
subtract_filter,
s.Filters.AddrBlocked)
*/
////// TEMP UNTIL PEERSTORE GETS UPGRADED
// Ref: https://github.com/ipfs/go-libp2p-peer/pull/1
paddrs
:=
s
.
peers
.
Addrs
(
p
)
good_addrs
:=
addrutil
.
FilterAddrs
(
paddrs
,
addrutil
.
AddrUsableFunc
,
subtract_filter
,
addrutil
.
FilterNeg
(
s
.
Filters
.
AddrBlocked
),
)
remoteAddrChan
:=
make
(
chan
ma
.
Multiaddr
,
len
(
good_addrs
))
for
_
,
a
:=
range
good_addrs
{
remoteAddrChan
<-
a
}
close
(
remoteAddrChan
)
/////////
// try to get a connection to any addr
connC
,
err
:=
s
.
dialAddrs
(
ctx
,
p
,
remoteAddr
s
)
connC
,
err
:=
s
.
dialAddrs
(
ctx
,
p
,
remoteAddr
Chan
)
if
err
!=
nil
{
logdial
[
"error"
]
=
err
return
nil
,
err
...
...
@@ -364,98 +366,64 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return
swarmC
,
nil
}
func
(
s
*
Swarm
)
dialAddrs
(
ctx
context
.
Context
,
p
peer
.
ID
,
remoteAddrs
[]
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
{
// sort addresses so preferred addresses are dialed sooner
sort
.
Sort
(
AddrList
(
remoteAddrs
))
// try to connect to one of the peer's known addresses.
// we dial concurrently to each of the addresses, which:
// * makes the process faster overall
// * attempts to get the fastest connection available.
// * mitigates the waste of trying bad addresses
func
(
s
*
Swarm
)
dialAddrs
(
ctx
context
.
Context
,
p
peer
.
ID
,
remoteAddrs
<-
chan
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
{
log
.
Debugf
(
"%s swarm dialing %s %s"
,
s
.
local
,
p
,
remoteAddrs
)
ctx
,
cancel
:=
context
.
WithCancel
(
ctx
)
defer
cancel
()
// cancel work when we exit func
conns
:=
make
(
ch
an
conn
.
Conn
)
errs
:=
make
(
chan
error
,
len
(
remoteAddrs
)
)
// use a single response type instead of errs
an
d
conn
s, reduces complexity *a ton*
respch
:=
make
(
chan
dialResult
)
// dialSingleAddr is used in the rate-limited async thing below.
dialSingleAddr
:=
func
(
addr
ma
.
Multiaddr
)
{
// rebind chans in scope so we can nil them out easily
connsout
:=
conns
errsout
:=
errs
defaultDialFail
:=
fmt
.
Errorf
(
"failed to dial %s (default failure)"
,
p
)
exitErr
:=
defaultDialFail
connC
,
err
:=
s
.
dialAddr
(
ctx
,
p
,
addr
)
if
err
!=
nil
{
connsout
=
nil
}
else
if
connC
==
nil
{
// NOTE: this really should never happen
log
.
Errorf
(
"failed to dial %s %s and got no error!"
,
p
,
addr
)
err
=
fmt
.
Errorf
(
"failed to dial %s %s"
,
p
,
addr
)
connsout
=
nil
}
else
{
errsout
=
nil
}
// check parent still wants our results
var
active
int
for
{
select
{
case
<-
ctx
.
Done
()
:
if
connC
!=
nil
{
connC
.
Close
()
}
case
errsout
<-
err
:
case
connsout
<-
connC
:
case
addr
,
ok
:=
<-
remoteAddrs
:
if
!
ok
{
remoteAddrs
=
nil
if
active
==
0
{
return
nil
,
exitErr
}
continue
}
// this whole thing is in a goroutine so we can use foundConn
// to end early.
go
func
()
{
limiter
:=
make
(
chan
struct
{},
8
)
for
_
,
addr
:=
range
remoteAddrs
{
// returns whatever ratelimiting is acceptable for workerAddr.
// may not rate limit at all.
rl
:=
s
.
addrDialRateLimit
(
addr
)
select
{
case
<-
ctx
.
Done
()
:
// our context was cancelled
return
case
rl
<-
struct
{}{}
:
// take the token, move on
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
s
.
limitedDial
(
ctx
,
p
,
addr
,
respch
)
active
++
case
<-
ctx
.
Done
()
:
if
exitErr
==
defaultDialFail
{
exitErr
=
ctx
.
Err
()
}
select
{
case
<-
ctx
.
Done
()
:
// our context was cancelled
return
case
limiter
<-
struct
{}{}
:
// take the token, move on
return
nil
,
exitErr
case
resp
:=
<-
respch
:
active
--
if
resp
.
Err
!=
nil
{
log
.
Error
(
"got error on dial: "
,
resp
.
Err
)
// Errors are normal, lots of dials will fail
exitErr
=
resp
.
Err
if
remoteAddrs
==
nil
&&
active
==
0
{
return
nil
,
exitErr
}
go
func
(
rlc
<-
chan
struct
{},
a
ma
.
Multiaddr
)
{
dialSingleAddr
(
a
)
<-
limiter
<-
rlc
}(
rl
,
addr
)
}
else
if
resp
.
Conn
!=
nil
{
return
resp
.
Conn
,
nil
}
}()
// wair for the results.
exitErr
:=
fmt
.
Errorf
(
"failed to dial %s"
,
p
)
for
range
remoteAddrs
{
select
{
case
exitErr
=
<-
errs
:
//
log
.
Debug
(
"dial error: "
,
exitErr
)
case
connC
:=
<-
conns
:
// take the first + return asap
return
connC
,
nil
case
<-
ctx
.
Done
()
:
// break out and return error
break
}
}
return
nil
,
exitErr
}
func
(
s
*
Swarm
)
limitedDial
(
ctx
context
.
Context
,
p
peer
.
ID
,
a
ma
.
Multiaddr
,
resp
chan
dialResult
)
{
s
.
limiter
.
AddDialJob
(
&
dialJob
{
addr
:
a
,
peer
:
p
,
resp
:
resp
,
ctx
:
ctx
,
})
}
func
(
s
*
Swarm
)
dialAddr
(
ctx
context
.
Context
,
p
peer
.
ID
,
addr
ma
.
Multiaddr
)
(
conn
.
Conn
,
error
)
{
...
...
@@ -485,16 +453,6 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (con
return
connC
,
nil
}
func
(
s
*
Swarm
)
filterAddrs
(
addrs
[]
ma
.
Multiaddr
)
[]
ma
.
Multiaddr
{
var
out
[]
ma
.
Multiaddr
for
_
,
a
:=
range
addrs
{
if
!
s
.
Filters
.
AddrBlocked
(
a
)
{
out
=
append
(
out
,
a
)
}
}
return
out
}
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func
dialConnSetup
(
ctx
context
.
Context
,
s
*
Swarm
,
connC
conn
.
Conn
)
(
*
Conn
,
error
)
{
...
...
@@ -514,72 +472,3 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error
return
swarmC
,
err
}
// addrDialRateLimit returns a ratelimiting channel for dialing transport
// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited.
func
(
s
*
Swarm
)
addrDialRateLimit
(
a
ma
.
Multiaddr
)
chan
struct
{}
{
if
isFDCostlyTransport
(
a
)
{
return
s
.
fdRateLimit
}
// do not rate limit it at all
return
make
(
chan
struct
{},
1
)
}
func
isFDCostlyTransport
(
a
ma
.
Multiaddr
)
bool
{
return
isTCPMultiaddr
(
a
)
}
func
isTCPMultiaddr
(
a
ma
.
Multiaddr
)
bool
{
p
:=
a
.
Protocols
()
return
len
(
p
)
==
2
&&
(
p
[
0
]
.
Name
==
"ip4"
||
p
[
0
]
.
Name
==
"ip6"
)
&&
p
[
1
]
.
Name
==
"tcp"
}
type
AddrList
[]
ma
.
Multiaddr
func
(
al
AddrList
)
Len
()
int
{
return
len
(
al
)
}
func
(
al
AddrList
)
Swap
(
i
,
j
int
)
{
al
[
i
],
al
[
j
]
=
al
[
j
],
al
[
i
]
}
func
(
al
AddrList
)
Less
(
i
,
j
int
)
bool
{
a
:=
al
[
i
]
b
:=
al
[
j
]
// dial localhost addresses next, they should fail immediately
lba
:=
manet
.
IsIPLoopback
(
a
)
lbb
:=
manet
.
IsIPLoopback
(
b
)
if
lba
{
if
!
lbb
{
return
true
}
}
// dial utp and similar 'non-fd-consuming' addresses first
fda
:=
isFDCostlyTransport
(
a
)
fdb
:=
isFDCostlyTransport
(
b
)
if
!
fda
{
if
fdb
{
return
true
}
// if neither consume fd's, assume equal ordering
return
false
}
// if 'b' doesnt take a file descriptor
if
!
fdb
{
return
false
}
// if 'b' is loopback and both take file descriptors
if
lbb
{
return
false
}
// for the rest, just sort by bytes
return
bytes
.
Compare
(
a
.
Bytes
(),
b
.
Bytes
())
>
0
}
p2p/net/swarm/swarm_test.go
View file @
d899b07f
...
...
@@ -303,7 +303,7 @@ func TestAddrBlocking(t *testing.T) {
swarms
:=
makeSwarms
(
ctx
,
t
,
2
)
swarms
[
0
]
.
SetConnHandler
(
func
(
conn
*
Conn
)
{
t
.
Fatal
f
(
"no connections should happen! -- %s"
,
conn
)
t
.
Error
f
(
"no connections should happen! -- %s"
,
conn
)
})
_
,
block
,
err
:=
net
.
ParseCIDR
(
"127.0.0.1/8"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment