swarm_dial.go 7.35 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
package swarm

import (
	"errors"
	"fmt"
6
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
8

	conn "github.com/jbenet/go-ipfs/p2p/net/conn"
9
	addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
11
12
13
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
14
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
16
)

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
//  for {
//  	c := findConnectionToPeer(peer)
//  	if c != nil {
//  		return c
//  	}
//
//  	// ok, no connections. should we dial?
//  	if ok, wait := dialsync.Lock(peer); !ok {
//  		<-wait // can optionally wait
//  		continue
//  	}
//  	defer dialsync.Unlock(peer)
//
//  	c := actuallyDial(peer)
//  	return c
//  }
//
type dialsync struct {
	// ongoing is a map of tickets for the current peers being dialed.
	// this way, we dont kick off N dials simultaneously.
	ongoing map[peer.ID]chan struct{}
	lock    sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
	ds.lock.Lock()
	if ds.ongoing == nil { // init if not ready
		ds.ongoing = make(map[peer.ID]chan struct{})
	}
	wait, found := ds.ongoing[dst]
	if !found {
		ds.ongoing[dst] = make(chan struct{})
	}
	ds.lock.Unlock()

	if found {
		return false, wait
	}

	// ok! you're signed up to dial!
	return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
	ds.lock.Lock()
	wait, found := ds.ongoing[dst]
	if !found {
		panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
	}
	delete(ds.ongoing, dst) // remove ongoing dial
	close(wait)             // release everyone else
	ds.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
89
90
91
92
93
94
95
96
97
98
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
	}

99
100
101
102
103
104
105
106
107
108
	// this loop is here because dials take time, and we should not be dialing
	// the same peer concurrently (silly waste). Additonally, it's structured
	// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we
	// may have received an incoming connection! if so, we no longer must dial.
	//
	// dial attempts. we may be doing the dialing. if not, we wait.
	attempts := 3
	var err error
	var conn *Conn
	for i := 0; i < attempts; i++ {
109
110
		// check if we already have an open connection first
		cs := s.ConnectionsToPeer(p)
111
112
113
		for _, conn = range cs {
			if conn != nil { // dump out the first one we find. (TODO pick better)
				return conn, nil
114
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115
		}
116
117

		// check if there's an ongoing dial to this peer
118
		if ok, wait := s.dsync.Lock(p); !ok {
119
			select {
120
121
122
			case <-wait: // wait for that dial to finish.
				continue // and see if it worked (loop), OR we got an incoming dial.
			case <-ctx.Done(): // or we may have to bail...
123
124
125
126
				return nil, ctx.Err()
			}
		}

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
		// ok, we have been charged to dial! let's do it.
		conn, err = s.dial(ctx, p)
		s.dsync.Unlock(p)
		if err != nil {
			continue // ok, we failed. try again. (if loop is done, our error is output)
		}
		return conn, nil
	}
	return nil, err
}

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
142
143
144
145
146
147
148
149
	}

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
	}

150
151
152
153
154
155
156
	// get our own addrs
	localAddrs := s.peers.Addresses(s.local)
	if len(localAddrs) == 0 {
		log.Debug("Dialing out with no local addresses.")
	}

	// get remote peer addrs
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
	remoteAddrs := s.peers.Addresses(p)
158
	// make sure we can use the addresses.
159
	remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
160
161
162
	// drop out any addrs that would just dial ourselves. use ListenAddresses
	// as that is a more authoritative view than localAddrs.
	remoteAddrs = addrutil.Subtract(remoteAddrs, s.ListenAddresses())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
163
164
165
166
167
168
169
170
171
172
173
	if len(remoteAddrs) == 0 {
		return nil, errors.New("peer has no addresses")
	}

	// open connection to peer
	d := &conn.Dialer{
		LocalPeer:  s.local,
		LocalAddrs: localAddrs,
		PrivateKey: sk,
	}

174
175
	// try to get a connection to any addr
	connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
	if err != nil {
		return nil, err
	}

	// ok try to setup the new connection.
	swarmC, err := dialConnSetup(ctx, s, connC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
		swarmC.Close() // close the connection. didn't work out :(
		return nil, err
	}

	log.Event(ctx, "dial", p)
	return swarmC, nil
}

193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {

	// try to connect to one of the peer's known addresses.
	// for simplicity, we do this sequentially.
	// A future commit will do this asynchronously.
	for _, addr := range remoteAddrs {
		connC, err := d.Dial(ctx, addr, p)
		if err != nil {
			continue
		}

		// if the connection is not to whom we thought it would be...
		if connC.RemotePeer() != p {
			log.Infof("misdial to %s through %s (got %s)", p, addr, connC.RemoteMultiaddr())
			connC.Close()
			continue
		}

		// if the connection is to ourselves...
		// this can happen TONS when Loopback addrs are advertized.
		// (this should be caught by two checks above, but let's just make sure.)
		if connC.RemotePeer() == s.local {
			log.Infof("misdial to %s through %s", p, addr)
			connC.Close()
			continue
		}

		// success! we got one!
		return connC, nil
	}
	return nil, fmt.Errorf("failed to dial %s", p)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {

	psC, err := s.swarm.AddConn(connC)
	if err != nil {
		// connC is closed by caller if we fail.
		return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
	}

	// ok try to setup the new connection. (newConnSetup will add to group)
	swarmC, err := s.newConnSetup(ctx, psC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
		swarmC.Close() // we need to call this to make sure psC is Closed.
		return nil, err
	}

	return swarmC, err
}