swarm_dial.go 8.55 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
package swarm

import (
	"errors"
	"fmt"
6
	"sync"
7
	"time"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
9

	conn "github.com/jbenet/go-ipfs/p2p/net/conn"
10
	addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
12
13
14
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
15
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
16
17
)

18
19
20
// dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3

21
22
23
24
25
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
var DialTimeout time.Duration = time.Second * 30

26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
//  for {
//  	c := findConnectionToPeer(peer)
//  	if c != nil {
//  		return c
//  	}
//
//  	// ok, no connections. should we dial?
//  	if ok, wait := dialsync.Lock(peer); !ok {
//  		<-wait // can optionally wait
//  		continue
//  	}
//  	defer dialsync.Unlock(peer)
//
//  	c := actuallyDial(peer)
//  	return c
//  }
//
type dialsync struct {
	// ongoing is a map of tickets for the current peers being dialed.
	// this way, we dont kick off N dials simultaneously.
	ongoing map[peer.ID]chan struct{}
	lock    sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
	ds.lock.Lock()
	if ds.ongoing == nil { // init if not ready
		ds.ongoing = make(map[peer.ID]chan struct{})
	}
	wait, found := ds.ongoing[dst]
	if !found {
		ds.ongoing[dst] = make(chan struct{})
	}
	ds.lock.Unlock()

	if found {
		return false, wait
	}

	// ok! you're signed up to dial!
	return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
	ds.lock.Lock()
	wait, found := ds.ongoing[dst]
	if !found {
		panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
	}
	delete(ds.ongoing, dst) // remove ongoing dial
	close(wait)             // release everyone else
	ds.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97
98
99
100
101
102
103
104
105
106
107
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
	}

108
109
110
111
112
	// this loop is here because dials take time, and we should not be dialing
	// the same peer concurrently (silly waste). Additonally, it's structured
	// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we
	// may have received an incoming connection! if so, we no longer must dial.
	//
113
	// During the dial attempts, we may be doing the dialing. if not, we wait.
114
115
	var err error
	var conn *Conn
116
	for i := 0; i < dialAttempts; i++ {
117
118
		// check if we already have an open connection first
		cs := s.ConnectionsToPeer(p)
119
120
121
		for _, conn = range cs {
			if conn != nil { // dump out the first one we find. (TODO pick better)
				return conn, nil
122
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
123
		}
124
125

		// check if there's an ongoing dial to this peer
126
		if ok, wait := s.dsync.Lock(p); !ok {
127
			log.Debugf("swarm %s dialing %s -- waiting for ongoing dial", s.local, p)
128
			select {
129
130
131
			case <-wait: // wait for that dial to finish.
				continue // and see if it worked (loop), OR we got an incoming dial.
			case <-ctx.Done(): // or we may have to bail...
132
133
134
135
				return nil, ctx.Err()
			}
		}

136
		// ok, we have been charged to dial! let's do it.
137
		// if it succeeds, dial will add the conn to the swarm itself.
138
139
140
		log.Debugf("swarm %s dialing %s -- dial start", s.local, p)
		ctxT, _ := context.WithTimeout(ctx, DialTimeout)
		conn, err = s.dial(ctxT, p)
141
		s.dsync.Unlock(p)
142
		log.Debugf("swarm %s dialing %s -- dial end %s", s.local, p, conn)
143
144
145
146
147
		if err != nil {
			continue // ok, we failed. try again. (if loop is done, our error is output)
		}
		return conn, nil
	}
148
149
150
	if err == nil {
		err = fmt.Errorf("%s failed to dial %s after %d attempts", s.local, p, dialAttempts)
	}
151
152
153
154
155
156
157
	return nil, err
}

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
159
160
161
162
163
164
165
	}

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
	}

166
167
168
169
170
171
172
	// get our own addrs
	localAddrs := s.peers.Addresses(s.local)
	if len(localAddrs) == 0 {
		log.Debug("Dialing out with no local addresses.")
	}

	// get remote peer addrs
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
173
	remoteAddrs := s.peers.Addresses(p)
174
	// make sure we can use the addresses.
175
	remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
176
177
	// drop out any addrs that would just dial ourselves. use ListenAddresses
	// as that is a more authoritative view than localAddrs.
178
179
180
181
	ila, _ := InterfaceListenAddresses(s)
	remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
	remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local))
	log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182
183
184
185
186
187
188
189
190
191
192
	if len(remoteAddrs) == 0 {
		return nil, errors.New("peer has no addresses")
	}

	// open connection to peer
	d := &conn.Dialer{
		LocalPeer:  s.local,
		LocalAddrs: localAddrs,
		PrivateKey: sk,
	}

193
194
	// try to get a connection to any addr
	connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
195
196
197
198
199
200
201
202
203
	if err != nil {
		return nil, err
	}

	// ok try to setup the new connection.
	swarmC, err := dialConnSetup(ctx, s, connC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
204
		connC.Close() // close the connection. didn't work out :(
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
205
206
207
208
209
210
211
		return nil, err
	}

	log.Event(ctx, "dial", p)
	return swarmC, nil
}

212
213
214
215
216
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {

	// try to connect to one of the peer's known addresses.
	// for simplicity, we do this sequentially.
	// A future commit will do this asynchronously.
217
218
	log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
	var err error
219
	for _, addr := range remoteAddrs {
220
221
222
		log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
		var connC conn.Conn
		connC, err = d.Dial(ctx, addr, p)
223
		if err != nil {
224
			log.Info("%s --> %s dial attempt failed: %s", s.local, p, err)
225
226
227
228
229
			continue
		}

		// if the connection is not to whom we thought it would be...
		if connC.RemotePeer() != p {
230
			log.Infof("misdial to %s through %s (got %s)", p, addr, connC.RemotePeer())
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
			connC.Close()
			continue
		}

		// if the connection is to ourselves...
		// this can happen TONS when Loopback addrs are advertized.
		// (this should be caught by two checks above, but let's just make sure.)
		if connC.RemotePeer() == s.local {
			log.Infof("misdial to %s through %s", p, addr)
			connC.Close()
			continue
		}

		// success! we got one!
		return connC, nil
	}
247
248
249
	if err != nil {
		return nil, err
	}
250
251
252
	return nil, fmt.Errorf("failed to dial %s", p)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {

	psC, err := s.swarm.AddConn(connC)
	if err != nil {
		// connC is closed by caller if we fail.
		return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
	}

	// ok try to setup the new connection. (newConnSetup will add to group)
	swarmC, err := s.newConnSetup(ctx, psC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
268
		psC.Close() // we need to make sure psC is Closed.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
269
270
271
272
273
		return nil, err
	}

	return swarmC, err
}