Commit 4a2bc51c authored by Jeromy Johnson's avatar Jeromy Johnson
Browse files

Merge pull request #38 from ipfs/sketch/dial-redo

refactor swarm dialing logic
parents 52855747 6dddefe2
...@@ -40,12 +40,16 @@ func init() { ...@@ -40,12 +40,16 @@ func init() {
SupportedTransportProtocols = transports SupportedTransportProtocols = transports
} }
// FilterAddrs is a filter that removes certain addresses, according to filter. // FilterAddrs is a filter that removes certain addresses, according the given filters.
// if filter returns true, the address is kept. // if all filters return true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiaddr { func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr {
b := make([]ma.Multiaddr, 0, len(a)) b := make([]ma.Multiaddr, 0, len(a))
for _, addr := range a { for _, addr := range a {
if filter(addr) { good := true
for _, filter := range filters {
good = good && filter(addr)
}
if good {
b = append(b, addr) b = append(b, addr)
} }
} }
...@@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd ...@@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd
// from a list. the addresses removed are those known NOT // from a list. the addresses removed are those known NOT
// to work with our network. Namely, addresses with UTP. // to work with our network. Namely, addresses with UTP.
func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr { func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr {
return FilterAddrs(a, func(m ma.Multiaddr) bool { return FilterAddrs(a, AddrUsableFunc)
return AddrUsable(m, false) }
})
func AddrUsableFunc(m ma.Multiaddr) bool {
return AddrUsable(m, false)
} }
// AddrOverNonLocalIP returns whether the addr uses a non-local ip link // AddrOverNonLocalIP returns whether the addr uses a non-local ip link
......
package addrutil
import (
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)
// SubtractFilter returns a filter func that filters all of the given addresses
func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool {
addrmap := make(map[string]bool)
for _, a := range addrs {
addrmap[string(a.Bytes())] = true
}
return func(a ma.Multiaddr) bool {
return !addrmap[string(a.Bytes())]
}
}
// IsFDCostlyTransport returns true for transports that require a new file
// descriptor per connection created
func IsFDCostlyTransport(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}
// FilterNeg returns a negated version of the passed in filter
func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool {
return func(a ma.Multiaddr) bool {
return !f(a)
}
}
...@@ -2,7 +2,6 @@ package swarm ...@@ -2,7 +2,6 @@ package swarm
import ( import (
"net" "net"
"sort"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -494,38 +493,3 @@ func TestDialBackoffClears(t *testing.T) { ...@@ -494,38 +493,3 @@ func TestDialBackoffClears(t *testing.T) {
t.Log("correctly cleared backoff") t.Log("correctly cleared backoff")
} }
} }
func mkAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
}
func TestAddressSorting(t *testing.T) {
u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp")
u2l := mkAddr(t, "/ip4/127.0.0.1/udp/1234/utp")
local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234")
norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234")
l := AddrList{local, u1, u2l, norm}
sort.Sort(l)
if !l[0].Equal(u2l) {
t.Fatal("expected utp local addr to be sorted first: ", l[0])
}
if !l[1].Equal(u1) {
t.Fatal("expected utp addr to be sorted second")
}
if !l[2].Equal(local) {
t.Fatal("expected tcp localhost addr thid")
}
if !l[3].Equal(norm) {
t.Fatal("expected normal addr last")
}
}
package swarm
import (
"sync"
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
context "golang.org/x/net/context"
conn "github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
)
type dialResult struct {
Conn conn.Conn
Err error
}
type dialJob struct {
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
success bool
}
func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}
type dialLimiter struct {
rllock sync.Mutex
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)
activePerPeer map[peer.ID]int
perPeerLimit int
waitingOnPeerLimit map[peer.ID][]*dialJob
}
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)
func newDialLimiter(df dialfunc) *dialLimiter {
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}
func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
return &dialLimiter{
fdLimit: fdl,
perPeerLimit: ppl,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
}
}
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++
go dl.executeDial(next)
}
}
// release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
delete(dl.activePerPeer, dj.peer)
}
waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]
if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
} else {
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token
// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
}
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj)
return
}
// take token
dl.fdConsuming++
}
// take second needed token and start dial!
go dl.executeDial(dj)
}
func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[j.peer]
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
return
}
// take second needed token and start dial!
dl.activePerPeer[j.peer]++
go dl.executeDial(j)
}
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}
con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Err: err}:
case <-j.ctx.Done():
}
}
package swarm
import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
context "golang.org/x/net/context"
conn "github.com/ipfs/go-libp2p/p2p/net/conn"
)
func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
}
func addrWithPort(t *testing.T, p int) ma.Multiaddr {
return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
}
// in these tests I use addresses with tcp ports over a certain number to
// signify 'good' addresses that will succeed, and addresses below that number
// will fail. This lets us more easily test these different scenarios.
func tcpPortOver(a ma.Multiaddr, n int) bool {
port, err := a.ValueForProtocol(ma.P_TCP)
if err != nil {
panic(err)
}
pnum, err := strconv.Atoi(port)
if err != nil {
panic(err)
}
return pnum > n
}
func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) {
for _, a := range addrs {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: p,
addr: a,
resp: res,
})
}
}
func hangDialFunc(hang chan struct{}) dialfunc {
return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if mafmt.UTP.Matches(a) {
return conn.Conn(nil), nil
}
if tcpPortOver(a, 10) {
return conn.Conn(nil), nil
} else {
<-hang
return nil, fmt.Errorf("test bad dial")
}
}
}
func TestLimiterBasicDials(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
resch := make(chan dialResult)
pid := peer.ID("testpeer")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tryDialAddrs(ctx, l, pid, bads, resch)
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// complete a single hung dial
hang <- struct{}{}
select {
case r := <-resch:
if r.Err == nil {
t.Fatal("should have gotten failed dial result")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial completion")
}
select {
case r := <-resch:
if r.Err != nil {
t.Fatal("expected second result to be success!")
}
case <-time.After(time.Second):
}
}
func TestFDLimiting(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
good_tcp := addrWithPort(t, 20)
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
tryDialAddrs(ctx, l, pid, bads, resch)
}
// these dials should work normally, but will hang because we have taken
// up all the fd limiting
for _, pid := range pids {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: good_tcp,
resp: resch,
})
}
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have gotten successful response")
}
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for utp addr success")
}
}
func TestTokenRedistribution(t *testing.T) {
hangchs := make(map[peer.ID]chan struct{})
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if tcpPortOver(a, 10) {
return (conn.Conn)(nil), nil
} else {
<-hangchs[p]
return nil, fmt.Errorf("test bad dial")
}
}
l := newDialLimiterWithParams(df, 8, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2"}
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
hangchs[pid] = make(chan struct{})
tryDialAddrs(ctx, l, pid, bads, resch)
}
good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001")
// add a good dial job for peer 1
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[1],
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// unblock one dial for peer 0
hangchs[pids[0]] <- struct{}{}
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case <-resch:
t.Fatal("no more dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// add a bad dial job to peer 0 to fill their rate limiter
// and test that more dials for this peer won't interfere with peer 1's successful dial incoming
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[0],
addr: addrWithPort(t, 7),
resp: resch,
})
hangchs[pids[1]] <- struct{}{}
// now one failed dial from peer 1 should get through and fail
// which will in turn unblock the successful dial on peer 1
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have succeeded!")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("should have gotten successful dial")
}
}
func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if tcpPortOver(a, 1000) {
return conn.Conn(nil), nil
} else {
time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100)))
return nil, fmt.Errorf("test bad dial")
}
}
l := newDialLimiterWithParams(df, 20, 5)
var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
}
addresses := append(bads, addrWithPort(t, 2000))
success := make(chan struct{})
for i := 0; i < 20; i++ {
go func(id peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp := make(chan dialResult)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
for _, i := range rand.Perm(len(addresses)) {
l.AddDialJob(&dialJob{
addr: addresses[i],
ctx: ctx,
peer: id,
resp: resp,
})
}
for res := range resp {
if res.Err == nil {
success <- struct{}{}
return
}
}
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
}
for i := 0; i < 20; i++ {
select {
case <-success:
case <-time.After(time.Second * 5):
t.Fatal("expected a success within five seconds")
}
}
}
...@@ -91,6 +91,8 @@ type Swarm struct { ...@@ -91,6 +91,8 @@ type Swarm struct {
proc goprocess.Process proc goprocess.Process
ctx context.Context ctx context.Context
bwc metrics.Reporter bwc metrics.Reporter
limiter *dialLimiter
} }
// NewSwarm constructs a Swarm, with a Chan. // NewSwarm constructs a Swarm, with a Chan.
...@@ -123,6 +125,8 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, ...@@ -123,6 +125,8 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
} }
s.limiter = newDialLimiter(s.dialAddr)
// configure Swarm // configure Swarm
s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown)
s.SetConnHandler(nil) // make sure to setup our own conn handler. s.SetConnHandler(nil) // make sure to setup our own conn handler.
...@@ -156,6 +160,7 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { ...@@ -156,6 +160,7 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
} }
listenAddrs = filtered listenAddrs = filtered
} }
return listenAddrs, nil return listenAddrs, nil
} }
......
package swarm package swarm
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"sort"
"sync" "sync"
"time" "time"
...@@ -13,7 +11,6 @@ import ( ...@@ -13,7 +11,6 @@ import (
conn "github.com/ipfs/go-libp2p/p2p/net/conn" conn "github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
"github.com/jbenet/go-multiaddr-net"
context "golang.org/x/net/context" context "golang.org/x/net/context"
) )
...@@ -42,6 +39,9 @@ const dialAttempts = 1 ...@@ -42,6 +39,9 @@ const dialAttempts = 1
// number of concurrent outbound dials over transports that consume file descriptors // number of concurrent outbound dials over transports that consume file descriptors
const concurrentFdDials = 160 const concurrentFdDials = 160
// number of concurrent outbound dials to make per peer
const defaultPerPeerRateLimit = 8
// DialTimeout is the amount of time each dial attempt has. We can think about making // DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each // this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial) // subcomponent of Dial)
...@@ -319,32 +319,40 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -319,32 +319,40 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
} }
// get remote peer addrs
remoteAddrs := s.peers.Addrs(p)
// make sure we can use the addresses.
remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
// drop out any addrs that would just dial ourselves. use ListenAddresses
// as that is a more authoritative view than localAddrs.
ila, _ := s.InterfaceListenAddresses() ila, _ := s.InterfaceListenAddresses()
remoteAddrs = addrutil.Subtract(remoteAddrs, ila) subtract_filter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...)
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local))
// get live channel of addresses for peer, filtered by the given filters
log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs) /*
if len(remoteAddrs) == 0 { remoteAddrChan := s.peers.AddrsChan(ctx, p,
err := errors.New("peer has no addresses") addrutil.AddrUsableFilter,
logdial["error"] = err subtract_filter,
return nil, err s.Filters.AddrBlocked)
} */
remoteAddrs = s.filterAddrs(remoteAddrs) //////
if len(remoteAddrs) == 0 { /*
err := errors.New("all adresses for peer have been filtered out") This code is temporary, the peerstore can currently provide
logdial["error"] = err a channel as an interface for receiving addresses, but more thought
return nil, err needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
paddrs := s.peers.Addrs(p)
good_addrs := addrutil.FilterAddrs(paddrs,
addrutil.AddrUsableFunc,
subtract_filter,
addrutil.FilterNeg(s.Filters.AddrBlocked),
)
remoteAddrChan := make(chan ma.Multiaddr, len(good_addrs))
for _, a := range good_addrs {
remoteAddrChan <- a
} }
close(remoteAddrChan)
/////////
// try to get a connection to any addr // try to get a connection to any addr
connC, err := s.dialAddrs(ctx, p, remoteAddrs) connC, err := s.dialAddrs(ctx, p, remoteAddrChan)
if err != nil { if err != nil {
logdial["error"] = err logdial["error"] = err
return nil, err return nil, err
...@@ -364,98 +372,64 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -364,98 +372,64 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return swarmC, nil return swarmC, nil
} }
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (conn.Conn, error) {
// sort addresses so preferred addresses are dialed sooner
sort.Sort(AddrList(remoteAddrs))
// try to connect to one of the peer's known addresses.
// we dial concurrently to each of the addresses, which:
// * makes the process faster overall
// * attempts to get the fastest connection available.
// * mitigates the waste of trying bad addresses
log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs) log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel work when we exit func defer cancel() // cancel work when we exit func
conns := make(chan conn.Conn) // use a single response type instead of errs and conns, reduces complexity *a ton*
errs := make(chan error, len(remoteAddrs)) respch := make(chan dialResult)
// dialSingleAddr is used in the rate-limited async thing below. defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p)
dialSingleAddr := func(addr ma.Multiaddr) { exitErr := defaultDialFail
// rebind chans in scope so we can nil them out easily
connsout := conns
errsout := errs
connC, err := s.dialAddr(ctx, p, addr) var active int
if err != nil { for {
connsout = nil
} else if connC == nil {
// NOTE: this really should never happen
log.Errorf("failed to dial %s %s and got no error!", p, addr)
err = fmt.Errorf("failed to dial %s %s", p, addr)
connsout = nil
} else {
errsout = nil
}
// check parent still wants our results
select { select {
case <-ctx.Done(): case addr, ok := <-remoteAddrs:
if connC != nil { if !ok {
connC.Close() remoteAddrs = nil
if active == 0 {
return nil, exitErr
}
continue
} }
case errsout <- err:
case connsout <- connC:
}
}
// this whole thing is in a goroutine so we can use foundConn s.limitedDial(ctx, p, addr, respch)
// to end early. active++
go func() { case <-ctx.Done():
limiter := make(chan struct{}, 8) if exitErr == defaultDialFail {
for _, addr := range remoteAddrs { exitErr = ctx.Err()
// returns whatever ratelimiting is acceptable for workerAddr.
// may not rate limit at all.
rl := s.addrDialRateLimit(addr)
select {
case <-ctx.Done(): // our context was cancelled
return
case rl <- struct{}{}:
// take the token, move on
} }
return nil, exitErr
select { case resp := <-respch:
case <-ctx.Done(): // our context was cancelled active--
return if resp.Err != nil {
case limiter <- struct{}{}: log.Info("got error on dial: ", resp.Err)
// take the token, move on // Errors are normal, lots of dials will fail
exitErr = resp.Err
if remoteAddrs == nil && active == 0 {
return nil, exitErr
}
} else if resp.Conn != nil {
return resp.Conn, nil
} }
go func(rlc <-chan struct{}, a ma.Multiaddr) {
dialSingleAddr(a)
<-limiter
<-rlc
}(rl, addr)
}
}()
// wair for the results.
exitErr := fmt.Errorf("failed to dial %s", p)
for range remoteAddrs {
select {
case exitErr = <-errs: //
log.Debug("dial error: ", exitErr)
case connC := <-conns:
// take the first + return asap
return connC, nil
case <-ctx.Done():
// break out and return error
break
} }
} }
return nil, exitErr }
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
s.limiter.AddDialJob(&dialJob{
addr: a,
peer: p,
resp: resp,
ctx: ctx,
})
} }
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
...@@ -485,16 +459,6 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (con ...@@ -485,16 +459,6 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (con
return connC, nil return connC, nil
} }
func (s *Swarm) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
var out []ma.Multiaddr
for _, a := range addrs {
if !s.Filters.AddrBlocked(a) {
out = append(out, a)
}
}
return out
}
// dialConnSetup is the setup logic for a connection from the dial side. it // dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup // needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) { func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {
...@@ -514,72 +478,3 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error ...@@ -514,72 +478,3 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error
return swarmC, err return swarmC, err
} }
// addrDialRateLimit returns a ratelimiting channel for dialing transport
// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited.
func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} {
if isFDCostlyTransport(a) {
return s.fdRateLimit
}
// do not rate limit it at all
return make(chan struct{}, 1)
}
func isFDCostlyTransport(a ma.Multiaddr) bool {
return isTCPMultiaddr(a)
}
func isTCPMultiaddr(a ma.Multiaddr) bool {
p := a.Protocols()
return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp"
}
type AddrList []ma.Multiaddr
func (al AddrList) Len() int {
return len(al)
}
func (al AddrList) Swap(i, j int) {
al[i], al[j] = al[j], al[i]
}
func (al AddrList) Less(i, j int) bool {
a := al[i]
b := al[j]
// dial localhost addresses next, they should fail immediately
lba := manet.IsIPLoopback(a)
lbb := manet.IsIPLoopback(b)
if lba {
if !lbb {
return true
}
}
// dial utp and similar 'non-fd-consuming' addresses first
fda := isFDCostlyTransport(a)
fdb := isFDCostlyTransport(b)
if !fda {
if fdb {
return true
}
// if neither consume fd's, assume equal ordering
return false
}
// if 'b' doesnt take a file descriptor
if !fdb {
return false
}
// if 'b' is loopback and both take file descriptors
if lbb {
return false
}
// for the rest, just sort by bytes
return bytes.Compare(a.Bytes(), b.Bytes()) > 0
}
...@@ -304,7 +304,7 @@ func TestAddrBlocking(t *testing.T) { ...@@ -304,7 +304,7 @@ func TestAddrBlocking(t *testing.T) {
swarms := makeSwarms(ctx, t, 2) swarms := makeSwarms(ctx, t, 2)
swarms[0].SetConnHandler(func(conn *Conn) { swarms[0].SetConnHandler(func(conn *Conn) {
t.Fatalf("no connections should happen! -- %s", conn) t.Errorf("no connections should happen! -- %s", conn)
}) })
_, block, err := net.ParseCIDR("127.0.0.1/8") _, block, err := net.ParseCIDR("127.0.0.1/8")
......
{ {
"name":"go-libp2p",
"author": "whyrusleeping", "author": "whyrusleeping",
"bugs": { "bugs": {
"url": "https://github.com/ipfs/go-libp2p" "url": "https://github.com/ipfs/go-libp2p"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment