Commit 51709847 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

p2p/net/swarm: rate limit dials. max of 10 addrs at a time.

This will mitigate the fd explosion, but slow down dials majorly
as any peer with more addresses than the rate limit will have
to wait a whole dial timeout (~15s)
parent 0f4e8fb4
...@@ -15,6 +15,9 @@ import ( ...@@ -15,6 +15,9 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
procctx "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
) )
// Diagram of dial sync: // Diagram of dial sync:
...@@ -353,43 +356,60 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote ...@@ -353,43 +356,60 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
conns := make(chan conn.Conn, len(remoteAddrs)) conns := make(chan conn.Conn, len(remoteAddrs))
errs := make(chan error, len(remoteAddrs)) errs := make(chan error, len(remoteAddrs))
//TODO: rate limiting just in case? // dialSingleAddr is used in the rate-limited async thing below.
for _, addr := range remoteAddrs { dialSingleAddr := func(addr ma.Multiaddr) {
go func(addr ma.Multiaddr) { connC, err := s.dialAddr(ctx, d, p, addr)
connC, err := s.dialAddr(ctx, d, p, addr)
// check parent still wants our results // check parent still wants our results
select { select {
case <-foundConn: case <-foundConn:
if connC != nil { if connC != nil {
connC.Close() connC.Close()
}
return
default:
} }
return
default:
}
if err != nil { if err != nil {
errs <- err errs <- err
} else if connC == nil { } else if connC == nil {
errs <- fmt.Errorf("failed to dial %s %s", p, addr) errs <- fmt.Errorf("failed to dial %s %s", p, addr)
} else { } else {
conns <- connC conns <- connC
} }
}(addr)
} }
err := fmt.Errorf("failed to dial %s", p) // this whole thing is in a goroutine so we can use foundConn
// to end early.
go func() {
// rate limiting just in case. at most 10 addrs at once.
limiter := ratelimit.NewRateLimiter(procctx.WithContext(ctx), 10)
for _, addr := range remoteAddrs {
select {
case <-foundConn: // if one of them succeeded already
break
default:
}
workerAddr := addr // shadow variable to avoid race
limiter.Go(func(worker process.Process) {
dialSingleAddr(workerAddr)
})
}
}()
// wair fot the results.
exitErr := fmt.Errorf("failed to dial %s", p)
for i := 0; i < len(remoteAddrs); i++ { for i := 0; i < len(remoteAddrs); i++ {
select { select {
case err = <-errs: case exitErr = <-errs: //
log.Debug(err) log.Debug(exitErr)
case connC := <-conns: case connC := <-conns:
// take the first + return asap // take the first + return asap
close(foundConn) close(foundConn)
return connC, nil return connC, nil
} }
} }
return nil, err return nil, exitErr
} }
func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { func (s *Swarm) dialAddr(ctx context.Context, d *conn.Dialer, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment