Commit 93169571 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

Merge pull request #676 from jbenet/dial-events

p2p/net: dial log -> events
parents 9510082e 837fca7b
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" manet "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
reuseport "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport" reuseport "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-reuseport"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
...@@ -26,9 +27,14 @@ func (d *Dialer) String() string { ...@@ -26,9 +27,14 @@ func (d *Dialer) String() string {
// Ensures raddr is part of peer.Addresses() // Ensures raddr is part of peer.Addresses()
// Example: d.DialAddr(ctx, peer.Addresses()[0], peer) // Example: d.DialAddr(ctx, peer.Addresses()[0], peer)
func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) { func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) {
logdial := lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)
logdial["encrypted"] = (d.PrivateKey != nil) // log wether this will be an encrypted dial or not.
defer log.EventBegin(ctx, "connDial", logdial).Done()
maconn, err := d.rawConnDial(ctx, raddr, remote) maconn, err := d.rawConnDial(ctx, raddr, remote)
if err != nil { if err != nil {
logdial["dial"] = "failure"
logdial["error"] = err
return nil, err return nil, err
} }
...@@ -51,6 +57,7 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( ...@@ -51,6 +57,7 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
connOut = c connOut = c
return return
} }
c2, err := newSecureConn(ctx, d.PrivateKey, c) c2, err := newSecureConn(ctx, d.PrivateKey, c)
if err != nil { if err != nil {
errOut = err errOut = err
...@@ -64,12 +71,20 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( ...@@ -64,12 +71,20 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
select { select {
case <-ctx.Done(): case <-ctx.Done():
maconn.Close() maconn.Close()
logdial["error"] = ctx.Err()
return nil, ctx.Err() return nil, ctx.Err()
case <-done: case <-done:
// whew, finished. // whew, finished.
} }
return connOut, errOut if errOut != nil {
logdial["error"] = errOut
logdial["dial"] = "failure"
return nil, errOut
}
logdial["dial"] = "success"
return connOut, nil
} }
// rawConnDial dials the underlying net.Conn + manet.Conns // rawConnDial dials the underlying net.Conn + manet.Conns
...@@ -82,12 +97,14 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee ...@@ -82,12 +97,14 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee
} }
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") { if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr) return nil, debugerror.Errorf("Attempted to connect to zero address: %s", raddr)
} }
// get local addr to use. // get local addr to use.
laddr := pickLocalAddr(d.LocalAddrs, raddr) laddr := pickLocalAddr(d.LocalAddrs, raddr)
log.Debugf("%s dialing %s -- %s --> %s", d.LocalPeer, remote, laddr, raddr) logdial := lgbl.Dial("conn", d.LocalPeer, remote, laddr, raddr)
defer log.EventBegin(ctx, "connDialRawConn", logdial).Done()
// make a copy of the manet.Dialer, we may need to change its timeout. // make a copy of the manet.Dialer, we may need to change its timeout.
madialer := d.Dialer madialer := d.Dialer
...@@ -99,19 +116,27 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee ...@@ -99,19 +116,27 @@ func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote pee
// dial using reuseport.Dialer, because we're probably reusing addrs. // dial using reuseport.Dialer, because we're probably reusing addrs.
// this is optimistic, as the reuseDial may fail to bind the port. // this is optimistic, as the reuseDial may fail to bind the port.
rpev := log.EventBegin(ctx, "connDialReusePort", logdial)
if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil { if nconn, retry, reuseErr := reuseDial(madialer.Dialer, laddr, raddr); reuseErr == nil {
// if it worked, wrap the raw net.Conn with our manet.Conn // if it worked, wrap the raw net.Conn with our manet.Conn
log.Debugf("%s reuse worked! %s %s %s", d.LocalPeer, laddr, nconn.RemoteAddr(), nconn) logdial["reuseport"] = "success"
rpev.Done()
return manet.WrapNetConn(nconn) return manet.WrapNetConn(nconn)
} else if !retry { } else if !retry {
// reuseDial is sure this is a legitimate dial failure, not a reuseport failure. // reuseDial is sure this is a legitimate dial failure, not a reuseport failure.
logdial["reuseport"] = "failure"
logdial["error"] = reuseErr
rpev.Done()
return nil, reuseErr return nil, reuseErr
} else { } else {
// this is a failure to reuse port. log it. // this is a failure to reuse port. log it.
log.Debugf("%s port reuse failed: %s --> %s -- %s", d.LocalPeer, laddr, raddr, reuseErr) logdial["reuseport"] = "retry"
logdial["error"] = reuseErr
rpev.Done()
} }
} }
defer log.EventBegin(ctx, "connDialManet", logdial).Done()
return madialer.Dial(raddr) return madialer.Dial(raddr)
} }
......
...@@ -28,8 +28,16 @@ import ( ...@@ -28,8 +28,16 @@ import (
// any may fail if no addr at end // any may fail if no addr at end
// retry dialAttempt x // retry dialAttempt x
var (
ErrDialBackoff = errors.New("dial backoff")
ErrDialFailed = errors.New("dial attempt failed")
ErrDialToSelf = errors.New("dial to self attempted")
)
// dialAttempts governs how many times a goroutine will try to dial a given peer. // dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3 // Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
const dialAttempts = 1
// DialTimeout is the amount of time each dial attempt has. We can think about making // DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each // this larger down the road, or putting more granular timeouts (i.e. within each
...@@ -179,75 +187,97 @@ func (db *dialbackoff) Clear(p peer.ID) { ...@@ -179,75 +187,97 @@ func (db *dialbackoff) Clear(p peer.ID) {
// This allows us to use various transport protocols, do NAT traversal/relay, // This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection. // etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) { func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local { if p == s.local {
return nil, errors.New("Attempted connection to self!") log.Event(ctx, "swarmDialSelf", logdial)
} return nil, ErrDialToSelf
}
// this loop is here because dials take time, and we should not be dialing
// the same peer concurrently (silly waste). Additonally, it's structured return s.gatedDialAttempt(ctx, p)
// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we }
// may have received an incoming connection! if so, we no longer must dial.
// func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
// During the dial attempts, we may be doing the dialing. if not, we wait.
var err error
var conn *Conn
for i := 0; i < dialAttempts; i++ {
// check if we already have an open connection first
cs := s.ConnectionsToPeer(p) cs := s.ConnectionsToPeer(p)
for _, conn = range cs { for _, conn := range cs {
if conn != nil { // dump out the first one we find. (TODO pick better) if conn != nil { // dump out the first one we find. (TODO pick better)
return conn, nil return conn
} }
} }
return nil
}
// check if there's an ongoing dial to this peer // gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
if ok, wait := s.dsync.Lock(p); !ok { // dial synchronization systems: dialsync and dialbackoff.
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
if s.backf.Backoff(p) { var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
log.Debugf("backoff") defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done()
return nil, fmt.Errorf("%s failed to dial %s, backing off.", s.local, p)
}
log.Debugf("waiting for ongoing dial") // check if we already have an open connection first
select { conn := s.bestConnectionToPeer(p)
case <-wait: // wait for that dial to finish. if conn != nil {
continue // and see if it worked (loop), OR we got an incoming dial. return conn, nil
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
}
} }
// check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); ok {
// ok, we have been charged to dial! let's do it. // ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself. // if it succeeds, dial will add the conn to the swarm itself.
log.Debugf("dial start")
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, _ := context.WithTimeout(ctx, s.dialT) ctxT, _ := context.WithTimeout(ctx, s.dialT)
conn, err = s.dial(ctxT, p) conn, err := s.dial(ctxT, p)
s.dsync.Unlock(p) s.dsync.Unlock(p)
log.Debugf("dial end %s", conn) log.Debugf("dial end %s", conn)
if err != nil { if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff s.backf.AddBackoff(p) // let others know to backoff
continue // ok, we failed. try again. (if loop is done, our error is output) return nil, ErrDialFailed // ok, we failed. try again. (if loop is done, our error is output)
} }
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil return conn, nil
} else {
// we did not dial. we must wait for someone else to dial.
// check whether we should backoff first...
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", logdial)
return nil, ErrDialBackoff
}
defer log.EventBegin(ctx, "swarmDialWait", logdial).Done()
select {
case <-wait: // wait for that other dial to finish.
// see if it worked, OR we got an incoming dial in the meantime...
conn := s.bestConnectionToPeer(p)
if conn != nil {
return conn, nil
}
return nil, ErrDialFailed
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
} }
if err == nil {
err = fmt.Errorf("%s failed to dial %s after %d attempts", s.local, p, dialAttempts)
} }
return nil, err
} }
// dial is the actual swarm's dial logic, gated by Dial. // dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local { if p == s.local {
return nil, errors.New("Attempted connection to self!") log.Event(ctx, "swarmDialDoDialSelf", logdial)
return nil, ErrDialToSelf
} }
defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
logdial["dial"] = "failure" // start off with failure. set to "success" at the end.
sk := s.peers.PrivKey(s.local) sk := s.peers.PrivKey(s.local)
logdial["encrypted"] = (sk != nil) // log wether this will be an encrypted dial or not.
if sk == nil { if sk == nil {
// may be fine for sk to be nil, just log a warning. // fine for sk to be nil, just log.
log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.") log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
} }
// get our own addrs. try dialing out from our listener addresses (reusing ports) // get our own addrs. try dialing out from our listener addresses (reusing ports)
...@@ -269,7 +299,9 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -269,7 +299,9 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local)) remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local))
log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses()) log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses())
if len(remoteAddrs) == 0 { if len(remoteAddrs) == 0 {
return nil, errors.New("peer has no addresses") err := errors.New("peer has no addresses")
logdial["error"] = err
return nil, err
} }
// open connection to peer // open connection to peer
...@@ -287,19 +319,21 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -287,19 +319,21 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
// try to get a connection to any addr // try to get a connection to any addr
connC, err := s.dialAddrs(ctx, d, p, remoteAddrs) connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
if err != nil { if err != nil {
logdial["error"] = err
return nil, err return nil, err
} }
logdial["netconn"] = lgbl.NetConn(connC)
// ok try to setup the new connection. // ok try to setup the new connection.
defer log.EventBegin(ctx, "swarmDialDoSetup", logdial, lgbl.NetConn(connC)).Done()
swarmC, err := dialConnSetup(ctx, s, connC) swarmC, err := dialConnSetup(ctx, s, connC)
if err != nil { if err != nil {
log.Debug("Dial newConnSetup failed. disconnecting.") logdial["error"] = err
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
connC.Close() // close the connection. didn't work out :( connC.Close() // close the connection. didn't work out :(
return nil, err return nil, err
} }
log.Event(ctx, "dial", p) logdial["dial"] = "success"
return swarmC, nil return swarmC, nil
} }
...@@ -398,8 +432,6 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error ...@@ -398,8 +432,6 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error
// ok try to setup the new connection. (newConnSetup will add to group) // ok try to setup the new connection. (newConnSetup will add to group)
swarmC, err := s.newConnSetup(ctx, psC) swarmC, err := s.newConnSetup(ctx, psC)
if err != nil { if err != nil {
log.Debug("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
psC.Close() // we need to make sure psC is Closed. psC.Close() // we need to make sure psC is Closed.
return nil, err return nil, err
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment