swarm.go 8.53 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
package swarm

import (
6
	"fmt"
7
	"io/ioutil"
8
	"sync"
9
	"time"
10

Jeromy's avatar
Jeromy committed
11
12
	metrics "github.com/ipfs/go-libp2p/p2p/metrics"
	mconn "github.com/ipfs/go-libp2p/p2p/metrics/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
13
	inet "github.com/ipfs/go-libp2p/p2p/net"
Jeromy's avatar
Jeromy committed
14
	conn "github.com/ipfs/go-libp2p/p2p/net/conn"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15
16
	filter "github.com/ipfs/go-libp2p/p2p/net/filter"
	addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
Jeromy's avatar
Jeromy committed
17
	transport "github.com/ipfs/go-libp2p/p2p/net/transport"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
	peer "github.com/ipfs/go-libp2p/p2p/peer"
Jeromy's avatar
Jeromy committed
19

Jeromy's avatar
Jeromy committed
20
	mafilter "gx/ipfs/QmPwfFAHUmvWDucLHRS9Xz2Kb1TNX2cY4LJ7pQjg9kVcae/multiaddr-filter"
Jeromy's avatar
Jeromy committed
21
22
	"gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
	goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
23
24
25
26
27
	pst "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer"
	psmss "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/multistream"
	spdy "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/spdystream"
	yamux "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/yamux"
	ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream"
Jeromy's avatar
Jeromy committed
28
29
	context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
	logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
Jeromy's avatar
Jeromy committed
30
	ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
31
32
)

Jeromy's avatar
Jeromy committed
33
var log = logging.Logger("swarm2")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34

35
36
37
var PSTransport pst.Transport

func init() {
38
39
40
	msstpt := psmss.NewBlankTransport()

	ymxtpt := &yamux.Transport{
Jeromy's avatar
Jeromy committed
41
		AcceptBacklog:          8192,
42
43
44
45
46
47
48
49
50
51
52
		ConnectionWriteTimeout: time.Second * 10,
		KeepAliveInterval:      time.Second * 30,
		EnableKeepAlive:        true,
		MaxStreamWindowSize:    uint32(1024 * 256),
		LogOutput:              ioutil.Discard,
	}

	msstpt.AddTransport("/yamux", ymxtpt)
	msstpt.AddTransport("/spdystream", spdy.Transport)

	PSTransport = msstpt
53
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
55
56
57
58
59
60
61
62
63
64
65

// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
//
// Uses peerstream.Swarm
type Swarm struct {
	swarm *ps.Swarm
	local peer.ID
	peers peer.Peerstore
	connh ConnHandler
66

67
	dsync dialsync
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
	backf dialbackoff
69
	dialT time.Duration // mainly for tests
70

Jeromy's avatar
Jeromy committed
71
72
	dialer *conn.Dialer

73
74
75
	notifmu sync.RWMutex
	notifs  map[inet.Notifiee]ps.Notifiee

Jeromy's avatar
Jeromy committed
76
77
	transports []transport.Transport

Jeromy's avatar
Jeromy committed
78
	// filters for addresses that shouldnt be dialed
79
	Filters *filter.Filters
Jeromy's avatar
Jeromy committed
80

Jeromy's avatar
Jeromy committed
81
82
83
	// file descriptor rate limited
	fdRateLimit chan struct{}

84
	proc goprocess.Process
85
	ctx  context.Context
86
	bwc  metrics.Reporter
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
88
89
90
}

// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
Jeromy's avatar
Jeromy committed
91
	local peer.ID, peers peer.Peerstore, bwc metrics.Reporter) (*Swarm, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92

93
94
95
	listenAddrs, err := filterAddrs(listenAddrs)
	if err != nil {
		return nil, err
96
97
	}

Jeromy's avatar
Jeromy committed
98
99
100
101
	wrap := func(c transport.Conn) transport.Conn {
		return mconn.WrapConn(bwc, c)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
	s := &Swarm{
103
104
105
106
107
108
109
110
111
112
		swarm:  ps.NewSwarm(PSTransport),
		local:  local,
		peers:  peers,
		ctx:    ctx,
		dialT:  DialTimeout,
		notifs: make(map[inet.Notifiee]ps.Notifiee),
		transports: []transport.Transport{
			transport.NewTCPTransport(),
			transport.NewUtpTransport(),
		},
Jeromy's avatar
Jeromy committed
113
114
115
116
		bwc:         bwc,
		fdRateLimit: make(chan struct{}, concurrentFdDials),
		Filters:     filter.NewFilters(),
		dialer:      conn.NewDialer(local, peers.PrivKey(local), wrap),
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
118
119
	}

	// configure Swarm
120
	s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121
122
	s.SetConnHandler(nil) // make sure to setup our own conn handler.

Jeromy's avatar
Jeromy committed
123
124
125
126
127
128
	err = s.setupInterfaces(listenAddrs)
	if err != nil {
		return nil, err
	}

	return s, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
129
130
131
132
133
134
}

func (s *Swarm) teardown() error {
	return s.swarm.Close()
}

135
136
137
138
139
140
141
142
143
func (s *Swarm) AddAddrFilter(f string) error {
	m, err := mafilter.NewMask(f)
	if err != nil {
		return err
	}

	s.Filters.AddDialFilter(m)
	return nil
}
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
	if len(listenAddrs) > 0 {
		filtered := addrutil.FilterUsableAddrs(listenAddrs)
		if len(filtered) < 1 {
			return nil, fmt.Errorf("swarm cannot use any addr in: %s", listenAddrs)
		}
		listenAddrs = filtered
	}
	return listenAddrs, nil
}

func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
	addrs, err := filterAddrs(addrs)
	if err != nil {
		return err
	}

Jeromy's avatar
Jeromy committed
161
	return s.setupInterfaces(addrs)
162
163
}

164
165
166
// Process returns the Process of the swarm
func (s *Swarm) Process() goprocess.Process {
	return s.proc
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
168
}

169
170
171
172
173
// Context returns the context of the swarm
func (s *Swarm) Context() context.Context {
	return s.ctx
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
174
175
// Close stops the Swarm.
func (s *Swarm) Close() error {
176
	return s.proc.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
}

// StreamSwarm returns the underlying peerstream.Swarm
func (s *Swarm) StreamSwarm() *ps.Swarm {
	return s.swarm
}

// SetConnHandler assigns the handler for new connections.
// See peerstream. You will rarely use this. See SetStreamHandler
func (s *Swarm) SetConnHandler(handler ConnHandler) {

	// handler is nil if user wants to clear the old handler.
	if handler == nil {
		s.swarm.SetConnHandler(func(psconn *ps.Conn) {
			s.connHandler(psconn)
		})
		return
	}

	s.swarm.SetConnHandler(func(psconn *ps.Conn) {
		// sc is nil if closed in our handler.
		if sc := s.connHandler(psconn); sc != nil {
			// call the user's handler. in a goroutine for sync safety.
			go handler(sc)
		}
	})
}

// SetStreamHandler assigns the handler for new streams.
// See peerstream.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
207
func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
208
209
210
211
212
213
	s.swarm.SetStreamHandler(func(s *ps.Stream) {
		handler(wrapStream(s))
	})
}

// NewStreamWithPeer creates a new stream on any available connection to p
214
func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
215
216
217
	// if we have no connections, try connecting.
	if len(s.ConnectionsToPeer(p)) == 0 {
		log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
218
		if _, err := s.Dial(ctx, p); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
219
220
221
222
223
			return nil, err
		}
	}
	log.Debug("Swarm: NewStreamWithPeer...")

224
	// TODO: think about passing a context down to NewStreamWithGroup
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
	st, err := s.swarm.NewStreamWithGroup(p)
	return wrapStream(st), err
}

// StreamsWithPeer returns all the live Streams to p
func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream {
	return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams()))
}

// ConnectionsToPeer returns all the live connections to p
func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn {
	return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns()))
}

// Connections returns a slice of all connections.
func (s *Swarm) Connections() []*Conn {
	return wrapConns(s.swarm.Conns())
}

// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p peer.ID) error {
	conns := s.swarm.ConnsWithGroup(p) // boom.
	for _, c := range conns {
		c.Close()
	}
	return nil
}

// Peers returns a copy of the set of peers swarm is connected to.
func (s *Swarm) Peers() []peer.ID {
	conns := s.Connections()

	seen := make(map[peer.ID]struct{})
	peers := make([]peer.ID, 0, len(conns))
	for _, c := range conns {
		p := c.RemotePeer()
		if _, found := seen[p]; found {
			continue
		}

265
		seen[p] = struct{}{}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
266
267
268
269
270
271
272
273
274
		peers = append(peers, p)
	}
	return peers
}

// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.ID {
	return s.local
}
275

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
276
277
278
279
280
281
282
283
284
// notifyAll sends a signal to all Notifiees
func (s *Swarm) notifyAll(notify func(inet.Notifiee)) {
	s.notifmu.RLock()
	for f := range s.notifs {
		go notify(f)
	}
	s.notifmu.RUnlock()
}

285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
// Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(f inet.Notifiee) {
	// wrap with our notifiee, to translate function calls
	n := &ps2netNotifee{net: (*Network)(s), not: f}

	s.notifmu.Lock()
	s.notifs[f] = n
	s.notifmu.Unlock()

	// register for notifications in the peer swarm.
	s.swarm.Notify(n)
}

// StopNotify unregisters Notifiee fromr receiving signals
func (s *Swarm) StopNotify(f inet.Notifiee) {
	s.notifmu.Lock()
	n, found := s.notifs[f]
	if found {
		delete(s.notifs, f)
	}
	s.notifmu.Unlock()

	if found {
		s.swarm.StopNotify(n)
	}
}

type ps2netNotifee struct {
	net *Network
	not inet.Notifiee
}

func (n *ps2netNotifee) Connected(c *ps.Conn) {
	n.not.Connected(n.net, inet.Conn((*Conn)(c)))
}

func (n *ps2netNotifee) Disconnected(c *ps.Conn) {
	n.not.Disconnected(n.net, inet.Conn((*Conn)(c)))
}

func (n *ps2netNotifee) OpenedStream(s *ps.Stream) {
	n.not.OpenedStream(n.net, inet.Stream((*Stream)(s)))
}

func (n *ps2netNotifee) ClosedStream(s *ps.Stream) {
	n.not.ClosedStream(n.net, inet.Stream((*Stream)(s)))
}