swarm_dial.go 7.45 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
package swarm

import (
	"errors"
	"fmt"
6
	"sync"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
8

	conn "github.com/jbenet/go-ipfs/p2p/net/conn"
9
	addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
11
12
13
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
14
	ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
16
)

17
18
19
// dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
//  for {
//  	c := findConnectionToPeer(peer)
//  	if c != nil {
//  		return c
//  	}
//
//  	// ok, no connections. should we dial?
//  	if ok, wait := dialsync.Lock(peer); !ok {
//  		<-wait // can optionally wait
//  		continue
//  	}
//  	defer dialsync.Unlock(peer)
//
//  	c := actuallyDial(peer)
//  	return c
//  }
//
type dialsync struct {
	// ongoing is a map of tickets for the current peers being dialed.
	// this way, we dont kick off N dials simultaneously.
	ongoing map[peer.ID]chan struct{}
	lock    sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
	ds.lock.Lock()
	if ds.ongoing == nil { // init if not ready
		ds.ongoing = make(map[peer.ID]chan struct{})
	}
	wait, found := ds.ongoing[dst]
	if !found {
		ds.ongoing[dst] = make(chan struct{})
	}
	ds.lock.Unlock()

	if found {
		return false, wait
	}

	// ok! you're signed up to dial!
	return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
	ds.lock.Lock()
	wait, found := ds.ongoing[dst]
	if !found {
		panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
	}
	delete(ds.ongoing, dst) // remove ongoing dial
	close(wait)             // release everyone else
	ds.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
91
92
93
94
95
96
97
98
99
100
101
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
	}

102
103
104
105
106
	// this loop is here because dials take time, and we should not be dialing
	// the same peer concurrently (silly waste). Additonally, it's structured
	// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we
	// may have received an incoming connection! if so, we no longer must dial.
	//
107
	// During the dial attempts, we may be doing the dialing. if not, we wait.
108
109
	var err error
	var conn *Conn
110
	for i := 0; i < dialAttempts; i++ {
111
112
		// check if we already have an open connection first
		cs := s.ConnectionsToPeer(p)
113
114
115
		for _, conn = range cs {
			if conn != nil { // dump out the first one we find. (TODO pick better)
				return conn, nil
116
			}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
		}
118
119

		// check if there's an ongoing dial to this peer
120
		if ok, wait := s.dsync.Lock(p); !ok {
121
			select {
122
123
124
			case <-wait: // wait for that dial to finish.
				continue // and see if it worked (loop), OR we got an incoming dial.
			case <-ctx.Done(): // or we may have to bail...
125
126
127
128
				return nil, ctx.Err()
			}
		}

129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
		// ok, we have been charged to dial! let's do it.
		conn, err = s.dial(ctx, p)
		s.dsync.Unlock(p)
		if err != nil {
			continue // ok, we failed. try again. (if loop is done, our error is output)
		}
		return conn, nil
	}
	return nil, err
}

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
	if p == s.local {
		return nil, errors.New("Attempted connection to self!")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
144
145
146
147
148
149
150
151
	}

	sk := s.peers.PrivKey(s.local)
	if sk == nil {
		// may be fine for sk to be nil, just log a warning.
		log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
	}

152
153
154
155
156
157
158
	// get our own addrs
	localAddrs := s.peers.Addresses(s.local)
	if len(localAddrs) == 0 {
		log.Debug("Dialing out with no local addresses.")
	}

	// get remote peer addrs
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
159
	remoteAddrs := s.peers.Addresses(p)
160
	// make sure we can use the addresses.
161
	remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
162
163
164
	// drop out any addrs that would just dial ourselves. use ListenAddresses
	// as that is a more authoritative view than localAddrs.
	remoteAddrs = addrutil.Subtract(remoteAddrs, s.ListenAddresses())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
165
166
167
168
169
170
171
172
173
174
175
	if len(remoteAddrs) == 0 {
		return nil, errors.New("peer has no addresses")
	}

	// open connection to peer
	d := &conn.Dialer{
		LocalPeer:  s.local,
		LocalAddrs: localAddrs,
		PrivateKey: sk,
	}

176
177
	// try to get a connection to any addr
	connC, err := s.dialAddrs(ctx, d, p, remoteAddrs)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
	if err != nil {
		return nil, err
	}

	// ok try to setup the new connection.
	swarmC, err := dialConnSetup(ctx, s, connC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
		swarmC.Close() // close the connection. didn't work out :(
		return nil, err
	}

	log.Event(ctx, "dial", p)
	return swarmC, nil
}

195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) {

	// try to connect to one of the peer's known addresses.
	// for simplicity, we do this sequentially.
	// A future commit will do this asynchronously.
	for _, addr := range remoteAddrs {
		connC, err := d.Dial(ctx, addr, p)
		if err != nil {
			continue
		}

		// if the connection is not to whom we thought it would be...
		if connC.RemotePeer() != p {
			log.Infof("misdial to %s through %s (got %s)", p, addr, connC.RemoteMultiaddr())
			connC.Close()
			continue
		}

		// if the connection is to ourselves...
		// this can happen TONS when Loopback addrs are advertized.
		// (this should be caught by two checks above, but let's just make sure.)
		if connC.RemotePeer() == s.local {
			log.Infof("misdial to %s through %s", p, addr)
			connC.Close()
			continue
		}

		// success! we got one!
		return connC, nil
	}
	return nil, fmt.Errorf("failed to dial %s", p)
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {

	psC, err := s.swarm.AddConn(connC)
	if err != nil {
		// connC is closed by caller if we fail.
		return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
	}

	// ok try to setup the new connection. (newConnSetup will add to group)
	swarmC, err := s.newConnSetup(ctx, psC)
	if err != nil {
		log.Error("Dial newConnSetup failed. disconnecting.")
		log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
		swarmC.Close() // we need to call this to make sure psC is Closed.
		return nil, err
	}

	return swarmC, err
}