swarm_dial.go 13.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
package swarm

import (
	"errors"
	"fmt"
6
	"sync"
7
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8

Jeromy's avatar
Jeromy committed
9
10
	lgbl "github.com/ipfs/go-libp2p-loggables"
	peer "github.com/ipfs/go-libp2p-peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
12
	conn "github.com/ipfs/go-libp2p/p2p/net/conn"
	addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
Jeromy's avatar
Jeromy committed
13
14
	ma "github.com/jbenet/go-multiaddr"
	context "golang.org/x/net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
16
)

17
18
19
20
21
22
23
24
25
26
27
// Diagram of dial sync:
//
//   many callers of Dial()   synched w.  dials many addrs       results to callers
//  ----------------------\    dialsync    use earliest            /--------------
//  -----------------------\              |----------\           /----------------
//  ------------------------>------------<-------     >---------<-----------------
//  -----------------------|              \----x                 \----------------
//  ----------------------|                \-----x                \---------------
//                                         any may fail          if no addr at end
//                                                             retry dialAttempt x

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
29
30
31
32
33
var (
	ErrDialBackoff = errors.New("dial backoff")
	ErrDialFailed  = errors.New("dial attempt failed")
	ErrDialToSelf  = errors.New("dial to self attempted")
)

34
// dialAttempts governs how many times a goroutine will try to dial a given peer.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
36
37
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
const dialAttempts = 1
38

Jeromy's avatar
Jeromy committed
39
40
41
// number of concurrent outbound dials over transports that consume file descriptors
const concurrentFdDials = 160

Jeromy's avatar
Jeromy committed
42
43
44
// number of concurrent outbound dials to make per peer
const defaultPerPeerRateLimit = 8

45
46
47
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
48
var DialTimeout time.Duration = time.Second * 10
49

50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
//  for {
//  	c := findConnectionToPeer(peer)
//  	if c != nil {
//  		return c
//  	}
//
//  	// ok, no connections. should we dial?
//  	if ok, wait := dialsync.Lock(peer); !ok {
//  		<-wait // can optionally wait
//  		continue
//  	}
//  	defer dialsync.Unlock(peer)
//
//  	c := actuallyDial(peer)
//  	return c
//  }
//
type dialsync struct {
	// ongoing is a map of tickets for the current peers being dialed.
	// this way, we dont kick off N dials simultaneously.
	ongoing map[peer.ID]chan struct{}
	lock    sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
	ds.lock.Lock()
	if ds.ongoing == nil { // init if not ready
		ds.ongoing = make(map[peer.ID]chan struct{})
	}
	wait, found := ds.ongoing[dst]
	if !found {
		ds.ongoing[dst] = make(chan struct{})
	}
	ds.lock.Unlock()

	if found {
		return false, wait
	}

	// ok! you're signed up to dial!
	return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
	ds.lock.Lock()
	wait, found := ds.ongoing[dst]
	if !found {
		panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
	}
Jeromy's avatar
Jeromy committed
116

117
118
119
120
121
	delete(ds.ongoing, dst) // remove ongoing dial
	close(wait)             // release everyone else
	ds.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
//  for {
//  	if ok, wait := dialsync.Lock(p); !ok {
//  		if backoff.Backoff(p) {
//  			return errDialFailed
//  		}
//  		<-wait
//  		continue
//  	}
//  	defer dialsync.Unlock(p)
//  	c, err := actuallyDial(p)
//  	if err != nil {
//  		dialbackoff.AddBackoff(p)
//  		continue
//  	}
//  	dialbackoff.Clear(p)
//  }
//
Jeromy's avatar
Jeromy committed
147

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
type dialbackoff struct {
Jeromy's avatar
Jeromy committed
149
	entries map[peer.ID]*backoffPeer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
150
151
152
	lock    sync.RWMutex
}

Jeromy's avatar
Jeromy committed
153
154
155
156
157
type backoffPeer struct {
	tries int
	until time.Time
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
159
func (db *dialbackoff) init() {
	if db.entries == nil {
Jeromy's avatar
Jeromy committed
160
		db.entries = make(map[peer.ID]*backoffPeer)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
161
162
163
164
	}
}

// Backoff returns whether the client should backoff from dialing
Jeromy's avatar
Jeromy committed
165
166
// peer p
func (db *dialbackoff) Backoff(p peer.ID) (backoff bool) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
168
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
169
	db.init()
Jeromy's avatar
Jeromy committed
170
171
172
173
174
175
	bp, found := db.entries[p]
	if found && time.Now().Before(bp.until) {
		return true
	}

	return false
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
176
177
}

Jeromy's avatar
Jeromy committed
178
179
180
const baseBackoffTime = time.Second * 5
const maxBackoffTime = time.Minute * 5

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
182
183
184
185
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
func (db *dialbackoff) AddBackoff(p peer.ID) {
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
186
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
187
	db.init()
Jeromy's avatar
Jeromy committed
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
	bp, ok := db.entries[p]
	if !ok {
		db.entries[p] = &backoffPeer{
			tries: 1,
			until: time.Now().Add(baseBackoffTime),
		}
		return
	}

	expTimeAdd := time.Second * time.Duration(bp.tries*bp.tries)
	if expTimeAdd > maxBackoffTime {
		expTimeAdd = maxBackoffTime
	}
	bp.until = time.Now().Add(baseBackoffTime + expTimeAdd)
	bp.tries++
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
203
204
205
206
207
208
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
func (db *dialbackoff) Clear(p peer.ID) {
	db.lock.Lock()
Jeromy's avatar
Jeromy committed
209
	defer db.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
210
211
212
213
	db.init()
	delete(db.entries, p)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
215
216
217
218
219
220
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
221
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
222
	if p == s.local {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
223
224
		log.Event(ctx, "swarmDialSelf", logdial)
		return nil, ErrDialToSelf
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
226
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
227
228
	return s.gatedDialAttempt(ctx, p)
}
229

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
230
231
232
233
234
235
236
237
238
func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
	cs := s.ConnectionsToPeer(p)
	for _, conn := range cs {
		if conn != nil { // dump out the first one we find. (TODO pick better)
			return conn
		}
	}
	return nil
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
239

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
240
241
242
243
244
// gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
// dial synchronization systems: dialsync and dialbackoff.
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
	defer log.EventBegin(ctx, "swarmDialAttemptSync", logdial).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
246
247
248
249
250
	// check if we already have an open connection first
	conn := s.bestConnectionToPeer(p)
	if conn != nil {
		return conn, nil
	}
251

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
253
	// check if there's an ongoing dial to this peer
	if ok, wait := s.dsync.Lock(p); ok {
Jeromy's avatar
Jeromy committed
254
255
256
257
258
259
260
261
		defer s.dsync.Unlock(p)

		// if this peer has been backed off, lets get out of here
		if s.backf.Backoff(p) {
			log.Event(ctx, "swarmDialBackoff", logdial)
			return nil, ErrDialBackoff
		}

262
		// ok, we have been charged to dial! let's do it.
263
		// if it succeeds, dial will add the conn to the swarm itself.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
264
		defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
265
		ctxT, cancel := context.WithTimeout(ctx, s.dialT)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266
		conn, err := s.dial(ctxT, p)
267
		cancel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
268
		log.Debugf("dial end %s", conn)
269
		if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
270
			log.Event(ctx, "swarmDialBackoffAdd", logdial)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
271
272
			s.backf.AddBackoff(p) // let others know to backoff

273
274
			// ok, we failed. try again. (if loop is done, our error is output)
			return nil, fmt.Errorf("dial attempt failed: %s", err)
275
		}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
276
		log.Event(ctx, "swarmDialBackoffClear", logdial)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
277
		s.backf.Clear(p) // okay, no longer need to backoff
278
		return conn, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301

	} else {
		// we did not dial. we must wait for someone else to dial.

		// check whether we should backoff first...
		if s.backf.Backoff(p) {
			log.Event(ctx, "swarmDialBackoff", logdial)
			return nil, ErrDialBackoff
		}

		defer log.EventBegin(ctx, "swarmDialWait", logdial).Done()
		select {
		case <-wait: // wait for that other dial to finish.

			// see if it worked, OR we got an incoming dial in the meantime...
			conn := s.bestConnectionToPeer(p)
			if conn != nil {
				return conn, nil
			}
			return nil, ErrDialFailed
		case <-ctx.Done(): // or we may have to bail...
			return nil, ctx.Err()
		}
302
303
304
305
306
	}
}

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
307
	var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
308
	if p == s.local {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
309
310
		log.Event(ctx, "swarmDialDoDialSelf", logdial)
		return nil, ErrDialToSelf
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
311
	}
312
313
	defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
	logdial["dial"] = "failure" // start off with failure. set to "success" at the end.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
314
315

	sk := s.peers.PrivKey(s.local)
316
	logdial["encrypted"] = (sk != nil) // log wether this will be an encrypted dial or not.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
317
	if sk == nil {
318
		// fine for sk to be nil, just log.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
319
		log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
320
321
	}

322
	ila, _ := s.InterfaceListenAddresses()
Jeromy's avatar
Jeromy committed
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
	subtract_filter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...)

	// get live channel of addresses for peer, filtered by the given filters
	/*
		remoteAddrChan := s.peers.AddrsChan(ctx, p,
			addrutil.AddrUsableFilter,
			subtract_filter,
			s.Filters.AddrBlocked)
	*/

	////// TEMP UNTIL PEERSTORE GETS UPGRADED
	// Ref: https://github.com/ipfs/go-libp2p-peer/pull/1
	paddrs := s.peers.Addrs(p)
	good_addrs := addrutil.FilterAddrs(paddrs,
		addrutil.AddrUsableFunc,
		subtract_filter,
		addrutil.FilterNeg(s.Filters.AddrBlocked),
	)
	remoteAddrChan := make(chan ma.Multiaddr, len(good_addrs))
	for _, a := range good_addrs {
		remoteAddrChan <- a
344
	}
Jeromy's avatar
Jeromy committed
345
346
	close(remoteAddrChan)
	/////////
347

348
	// try to get a connection to any addr
Jeromy's avatar
Jeromy committed
349
	connC, err := s.dialAddrs(ctx, p, remoteAddrChan)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
350
	if err != nil {
351
		logdial["error"] = err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
352
353
		return nil, err
	}
354
	logdial["netconn"] = lgbl.NetConn(connC)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
355
356

	// ok try to setup the new connection.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
357
	defer log.EventBegin(ctx, "swarmDialDoSetup", logdial, lgbl.NetConn(connC)).Done()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
358
359
	swarmC, err := dialConnSetup(ctx, s, connC)
	if err != nil {
360
		logdial["error"] = err
361
		connC.Close() // close the connection. didn't work out :(
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
362
363
364
		return nil, err
	}

365
	logdial["dial"] = "success"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
366
367
368
	return swarmC, nil
}

Jeromy's avatar
Jeromy committed
369
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (conn.Conn, error) {
370
	log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
371
372
373
374

	ctx, cancel := context.WithCancel(ctx)
	defer cancel() // cancel work when we exit func

Jeromy's avatar
Jeromy committed
375
376
	// use a single response type instead of errs and conns, reduces complexity *a ton*
	respch := make(chan dialResult)
377

Jeromy's avatar
Jeromy committed
378
379
	defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p)
	exitErr := defaultDialFail
Jeromy's avatar
Jeromy committed
380

Jeromy's avatar
Jeromy committed
381
382
	var active int
	for {
383
		select {
Jeromy's avatar
Jeromy committed
384
385
386
387
388
389
390
		case addr, ok := <-remoteAddrs:
			if !ok {
				remoteAddrs = nil
				if active == 0 {
					return nil, exitErr
				}
				continue
391
			}
392

Jeromy's avatar
Jeromy committed
393
394
395
396
397
398
399
400
			// limitedDial will start a dial to the given peer when
			// it is able, respecting the various different types of rate
			// limiting that occur without using extra goroutines per addr
			s.limitedDial(ctx, p, addr, respch)
			active++
		case <-ctx.Done():
			if exitErr == defaultDialFail {
				exitErr = ctx.Err()
Jeromy's avatar
Jeromy committed
401
			}
Jeromy's avatar
Jeromy committed
402
403
404
405
406
407
408
409
410
411
412
413
414
			return nil, exitErr
		case resp := <-respch:
			active--
			if resp.Err != nil {
				log.Error("got error on dial: ", resp.Err)
				// Errors are normal, lots of dials will fail
				exitErr = resp.Err

				if remoteAddrs == nil && active == 0 {
					return nil, exitErr
				}
			} else if resp.Conn != nil {
				return resp.Conn, nil
415
			}
416
		}
417
	}
Jeromy's avatar
Jeromy committed
418
419
420
421
422
423
424
425
426
}

func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
	s.limiter.AddDialJob(&dialJob{
		addr: a,
		peer: p,
		resp: resp,
		ctx:  ctx,
	})
427
428
}

Jeromy's avatar
Jeromy committed
429
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) {
430
431
	log.Debugf("%s swarm dialing %s %s", s.local, p, addr)

Jeromy's avatar
Jeromy committed
432
	connC, err := s.dialer.Dial(ctx, addr, p)
433
	if err != nil {
434
		return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
435
	}
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453

	// if the connection is not to whom we thought it would be...
	remotep := connC.RemotePeer()
	if remotep != p {
		connC.Close()
		return nil, fmt.Errorf("misdial to %s through %s (got %s)", p, addr, remotep)
	}

	// if the connection is to ourselves...
	// this can happen TONS when Loopback addrs are advertized.
	// (this should be caught by two checks above, but let's just make sure.)
	if remotep == s.local {
		connC.Close()
		return nil, fmt.Errorf("misdial to %s through %s (got self)", p, addr)
	}

	// success! we got one!
	return connC, nil
454
455
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
456
457
458
459
460
461
462
463
464
465
466
467
468
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {

	psC, err := s.swarm.AddConn(connC)
	if err != nil {
		// connC is closed by caller if we fail.
		return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
	}

	// ok try to setup the new connection. (newConnSetup will add to group)
	swarmC, err := s.newConnSetup(ctx, psC)
	if err != nil {
469
		psC.Close() // we need to make sure psC is Closed.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
470
471
472
473
474
		return nil, err
	}

	return swarmC, err
}