Commit 126e1506 authored by Jeromy's avatar Jeromy
Browse files

swarm: integrate new dialsync code into swarm

parent e26950ff
...@@ -415,7 +415,6 @@ func TestDialBackoff(t *testing.T) { ...@@ -415,7 +415,6 @@ func TestDialBackoff(t *testing.T) {
if !s1.backf.Backoff(s3p) { if !s1.backf.Backoff(s3p) {
t.Error("s3 should be on backoff") t.Error("s3 should be on backoff")
} }
} }
} }
......
package swarm package swarm
import ( import (
"context"
"sync" "sync"
peer "github.com/ipfs/go-libp2p-peer" peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
context "golang.org/x/net/context"
conn "github.com/libp2p/go-libp2p/p2p/net/conn" conn "github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr" addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
......
package swarm package swarm
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"strconv" "strconv"
...@@ -10,7 +11,6 @@ import ( ...@@ -10,7 +11,6 @@ import (
peer "github.com/ipfs/go-libp2p-peer" peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt" mafmt "github.com/whyrusleeping/mafmt"
context "golang.org/x/net/context"
conn "github.com/libp2p/go-libp2p/p2p/net/conn" conn "github.com/libp2p/go-libp2p/p2p/net/conn"
) )
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
package swarm package swarm
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
...@@ -32,7 +33,6 @@ import ( ...@@ -32,7 +33,6 @@ import (
yamux "github.com/whyrusleeping/go-smux-yamux" yamux "github.com/whyrusleeping/go-smux-yamux"
mafilter "github.com/whyrusleeping/multiaddr-filter" mafilter "github.com/whyrusleeping/multiaddr-filter"
ws "github.com/whyrusleeping/ws-transport" ws "github.com/whyrusleeping/ws-transport"
context "golang.org/x/net/context"
) )
var log = logging.Logger("swarm2") var log = logging.Logger("swarm2")
...@@ -76,7 +76,7 @@ type Swarm struct { ...@@ -76,7 +76,7 @@ type Swarm struct {
peers pstore.Peerstore peers pstore.Peerstore
connh ConnHandler connh ConnHandler
dsync dialsync dsync *DialSync
backf dialbackoff backf dialbackoff
dialT time.Duration // mainly for tests dialT time.Duration // mainly for tests
...@@ -134,6 +134,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, ...@@ -134,6 +134,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
} }
s.dsync = NewDialSync(s.doDial)
s.limiter = newDialLimiter(s.dialAddr) s.limiter = newDialLimiter(s.dialAddr)
// configure Swarm // configure Swarm
......
package swarm package swarm
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
...@@ -11,7 +12,6 @@ import ( ...@@ -11,7 +12,6 @@ import (
ma "github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
conn "github.com/libp2p/go-libp2p/p2p/net/conn" conn "github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr" addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
context "golang.org/x/net/context"
) )
// Diagram of dial sync: // Diagram of dial sync:
...@@ -53,78 +53,6 @@ const defaultPerPeerRateLimit = 8 ...@@ -53,78 +53,6 @@ const defaultPerPeerRateLimit = 8
// subcomponent of Dial) // subcomponent of Dial)
var DialTimeout = time.Second * 10 var DialTimeout = time.Second * 10
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
// for {
// c := findConnectionToPeer(peer)
// if c != nil {
// return c
// }
//
// // ok, no connections. should we dial?
// if ok, wait := dialsync.Lock(peer); !ok {
// <-wait // can optionally wait
// continue
// }
// defer dialsync.Unlock(peer)
//
// c := actuallyDial(peer)
// return c
// }
//
type dialsync struct {
// ongoing is a map of tickets for the current peers being dialed.
// this way, we dont kick off N dials simultaneously.
ongoing map[peer.ID]chan struct{}
lock sync.Mutex
}
// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
ds.lock.Lock()
if ds.ongoing == nil { // init if not ready
ds.ongoing = make(map[peer.ID]chan struct{})
}
wait, found := ds.ongoing[dst]
if !found {
ds.ongoing[dst] = make(chan struct{})
}
ds.lock.Unlock()
if found {
return false, wait
}
// ok! you're signed up to dial!
return true, nil
}
// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
ds.lock.Lock()
wait, found := ds.ongoing[dst]
if !found {
panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
}
delete(ds.ongoing, dst) // remove ongoing dial
close(wait) // release everyone else
ds.lock.Unlock()
}
// dialbackoff is a struct used to avoid over-dialing the same, dead peers. // dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them // Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they // to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
...@@ -246,8 +174,7 @@ func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn { ...@@ -246,8 +174,7 @@ func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
// gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's // gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
// dial synchronization systems: dialsync and dialbackoff. // dial synchronization systems: dialsync and dialbackoff.
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) { func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done()
// check if we already have an open connection first // check if we already have an open connection first
conn := s.bestConnectionToPeer(p) conn := s.bestConnectionToPeer(p)
...@@ -257,48 +184,34 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) ...@@ -257,48 +184,34 @@ func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error)
// if this peer has been backed off, lets get out of here // if this peer has been backed off, lets get out of here
if s.backf.Backoff(p) { if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", logdial) log.Event(ctx, "swarmDialBackoff", p)
return nil, ErrDialBackoff return nil, ErrDialBackoff
} }
// check if there's an ongoing dial to this peer return s.dsync.DialLock(ctx, p)
if ok, wait := s.dsync.Lock(p); ok { }
defer s.dsync.Unlock(p)
// ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p)
cancel()
log.Debugf("dial end %s", conn)
if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff
// ok, we failed. try again. (if loop is done, our error is output)
return nil, fmt.Errorf("dial attempt failed: %s", err)
}
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil
} else { // doDial is an ugly shim method to retain all the logging and backoff logic
// we did not dial. we must wait for someone else to dial. // of the old dialsync code
defer log.EventBegin(ctx, "swarmDialWait", logdial).Done() func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
select { var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
case <-wait: // wait for that other dial to finish. // ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p)
cancel()
log.Debugf("dial end %s", conn)
if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff
// see if it worked, OR we got an incoming dial in the meantime... // ok, we failed. try again. (if loop is done, our error is output)
conn := s.bestConnectionToPeer(p) return nil, fmt.Errorf("dial attempt failed: %s", err)
if conn != nil {
return conn, nil
}
return nil, ErrDialFailed
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
}
} }
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil
} }
// dial is the actual swarm's dial logic, gated by Dial. // dial is the actual swarm's dial logic, gated by Dial.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment