basic_host.go 14.6 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package basichost

import (
4
	"context"
5
	"io"
6
	"time"
7

Jeromy's avatar
Jeromy committed
8
9
	logging "github.com/ipfs/go-log"
	goprocess "github.com/jbenet/goprocess"
Steven Allen's avatar
Steven Allen committed
10
	goprocessctx "github.com/jbenet/goprocess/context"
11
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
Jeromy's avatar
Jeromy committed
12
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
13
14
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
15
	protocol "github.com/libp2p/go-libp2p-protocol"
Can ZHANG's avatar
Can ZHANG committed
16
	identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
vyzo's avatar
vyzo committed
17
	ping "github.com/libp2p/go-libp2p/p2p/protocol/ping"
Jeromy's avatar
Jeromy committed
18
	ma "github.com/multiformats/go-multiaddr"
19
	madns "github.com/multiformats/go-multiaddr-dns"
Jeromy's avatar
Jeromy committed
20
	msmux "github.com/multiformats/go-multistream"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
22
)

23
var log = logging.Logger("basichost")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25
26
27
28
29
30
31
var (
	// DefaultNegotiationTimeout is the default value for HostOpts.NegotiationTimeout.
	DefaultNegotiationTimeout = time.Second * 60

	// DefaultAddrsFactory is the default value for HostOpts.AddrsFactory.
	DefaultAddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs }
)
32

33
34
35
36
// AddrsFactory functions can be passed to New in order to override
// addresses returned by Addrs.
type AddrsFactory func([]ma.Multiaddr) []ma.Multiaddr

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
// Option is a type used to pass in options to the host.
38
39
//
// Deprecated in favor of HostOpts and NewHost.
40
41
type Option int

42
43
44
45
46
47
48
49
// NATPortMap makes the host attempt to open port-mapping in NAT devices
// for all its listeners. Pass in this option in the constructor to
// asynchronously a) find a gateway, b) open port mappings, c) republish
// port mappings periodically. The NATed addresses are included in the
// Host's Addrs() list.
//
// This option is deprecated in favor of HostOpts and NewHost.
const NATPortMap Option = iota
50

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
52
53
54
55
// BasicHost is the basic implementation of the host.Host interface. This
// particular host implementation:
//  * uses a protocol muxer to mux per-protocol streams
//  * uses an identity service to send + receive node information
//  * uses a nat service to establish NAT port mappings
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56
type BasicHost struct {
57
58
59
	network    inet.Network
	mux        *msmux.MultistreamMuxer
	ids        *identify.IDService
vyzo's avatar
vyzo committed
60
	pings      *ping.PingService
61
62
	natmgr     NATManager
	maResolver *madns.Resolver
63
	cmgr       ifconnmgr.ConnManager
64

65
66
	AddrsFactory AddrsFactory

67
	negtimeout time.Duration
68

69
	proc goprocess.Process
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
71
}

72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// HostOpts holds options that can be passed to NewHost in order to
// customize construction of the *BasicHost.
type HostOpts struct {

	// MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted.
	MultistreamMuxer *msmux.MultistreamMuxer

	// NegotiationTimeout determines the read and write timeouts on streams.
	// If 0 or omitted, it will use DefaultNegotiationTimeout.
	// If below 0, timeouts on streams will be deactivated.
	NegotiationTimeout time.Duration

	// IdentifyService holds an implementation of the /ipfs/id/ protocol.
	// If omitted, a new *identify.IDService will be used.
	IdentifyService *identify.IDService

	// AddrsFactory holds a function which can be used to override or filter the result of Addrs.
	// If omitted, there's no override or filtering, and the results of Addrs and AllAddrs are the same.
	AddrsFactory AddrsFactory

92
93
94
95
	// MultiaddrResolves holds the go-multiaddr-dns.Resolver used for resolving
	// /dns4, /dns6, and /dnsaddr addresses before trying to connect to a peer.
	MultiaddrResolver *madns.Resolver

96
97
	// NATManager takes care of setting NAT port mappings, and discovering external addresses.
	// If omitted, this will simply be disabled.
Steven Allen's avatar
Steven Allen committed
98
	NATManager func(inet.Network) NATManager
Jeromy's avatar
Jeromy committed
99
100

	// ConnManager is a libp2p connection manager
101
	ConnManager ifconnmgr.ConnManager
vyzo's avatar
vyzo committed
102
103
104

	// EnablePing indicates whether to instantiate the ping service
	EnablePing bool
105
106
107
}

// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
108
func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
109
	h := &BasicHost{
110
111
112
113
114
		network:      net,
		mux:          msmux.NewMultistreamMuxer(),
		negtimeout:   DefaultNegotiationTimeout,
		AddrsFactory: DefaultAddrsFactory,
		maResolver:   madns.DefaultResolver,
115
116
	}

Steven Allen's avatar
Steven Allen committed
117
	h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
118
119
120
121
122
123
		if h.natmgr != nil {
			h.natmgr.Close()
		}
		return h.Network().Close()
	})

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
	if opts.MultistreamMuxer != nil {
		h.mux = opts.MultistreamMuxer
	}

	if opts.IdentifyService != nil {
		h.ids = opts.IdentifyService
	} else {
		// we can't set this as a default above because it depends on the *BasicHost.
		h.ids = identify.NewIDService(h)
	}

	if uint64(opts.NegotiationTimeout) != 0 {
		h.negtimeout = opts.NegotiationTimeout
	}

	if opts.AddrsFactory != nil {
140
		h.AddrsFactory = opts.AddrsFactory
141
142
143
	}

	if opts.NATManager != nil {
Steven Allen's avatar
Steven Allen committed
144
		h.natmgr = opts.NATManager(net)
145
146
	}

147
148
149
150
	if opts.MultiaddrResolver != nil {
		h.maResolver = opts.MultiaddrResolver
	}

Jeromy's avatar
Jeromy committed
151
	if opts.ConnManager == nil {
152
		h.cmgr = &ifconnmgr.NullConnMgr{}
Jeromy's avatar
Jeromy committed
153
154
	} else {
		h.cmgr = opts.ConnManager
155
		net.Notify(h.cmgr.Notifee())
Jeromy's avatar
Jeromy committed
156
157
	}

vyzo's avatar
vyzo committed
158
159
160
161
	if opts.EnablePing {
		h.pings = ping.NewPingService(h)
	}

162
163
	net.SetConnHandler(h.newConnHandler)
	net.SetStreamHandler(h.newStreamHandler)
164
	return h, nil
165
}
Jeromy's avatar
Jeromy committed
166

167
// New constructs and sets up a new *BasicHost with given Network and options.
Steven Allen's avatar
Steven Allen committed
168
169
170
171
172
173
// The following options can be passed:
// * NATPortMap
// * AddrsFactory
// * ifconnmgr.ConnManager
// * madns.Resolver
//
174
175
176
// This function is deprecated in favor of NewHost and HostOpts.
func New(net inet.Network, opts ...interface{}) *BasicHost {
	hostopts := &HostOpts{}
177

178
	for _, o := range opts {
Jeromy's avatar
Jeromy committed
179
180
181
182
		switch o := o.(type) {
		case Option:
			switch o {
			case NATPortMap:
Steven Allen's avatar
Steven Allen committed
183
				hostopts.NATManager = NewNATManager
Jeromy's avatar
Jeromy committed
184
			}
185
		case AddrsFactory:
186
			hostopts.AddrsFactory = AddrsFactory(o)
187
		case ifconnmgr.ConnManager:
Jeromy's avatar
Jeromy committed
188
			hostopts.ConnManager = o
189
190
		case *madns.Resolver:
			hostopts.MultiaddrResolver = o
191
192
193
		}
	}

194
	h, err := NewHost(context.Background(), net, hostopts)
195
196
197
198
199
200
201
	if err != nil {
		// this cannot happen with legacy options
		// plus we want to keep the (deprecated) legacy interface unchanged
		panic(err)
	}

	return h
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
202
203
204
205
}

// newConnHandler is the remote-opened conn handler for inet.Network
func (h *BasicHost) newConnHandler(c inet.Conn) {
206
207
208
	// Clear protocols on connecting to new peer to avoid issues caused
	// by misremembering protocols between reconnects
	h.Peerstore().SetProtocols(c.RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
209
210
211
212
	h.ids.IdentifyConn(c)
}

// newStreamHandler is the remote-opened stream handler for inet.Network
Jeromy's avatar
Jeromy committed
213
// TODO: this feels a bit wonky
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
func (h *BasicHost) newStreamHandler(s inet.Stream) {
215
	before := time.Now()
216

217
218
	if h.negtimeout > 0 {
		if err := s.SetDeadline(time.Now().Add(h.negtimeout)); err != nil {
219
			log.Error("setting stream deadline: ", err)
Steven Allen's avatar
Steven Allen committed
220
			s.Reset()
221
222
223
224
			return
		}
	}

225
	lzc, protoID, handle, err := h.Mux().NegotiateLazy(s)
226
	took := time.Now().Sub(before)
Jeromy's avatar
Jeromy committed
227
	if err != nil {
228
		if err == io.EOF {
229
230
231
232
233
			logf := log.Debugf
			if took > time.Second*10 {
				logf = log.Warningf
			}
			logf("protocol EOF: %s (took %s)", s.Conn().RemotePeer(), took)
234
		} else {
235
			log.Debugf("protocol mux failed: %s (took %s)", err, took)
236
		}
Steven Allen's avatar
Steven Allen committed
237
		s.Reset()
Jeromy's avatar
Jeromy committed
238
239
		return
	}
240

241
242
243
244
245
	s = &streamWrapper{
		Stream: s,
		rw:     lzc,
	}

246
	if h.negtimeout > 0 {
247
248
		if err := s.SetDeadline(time.Time{}); err != nil {
			log.Error("resetting stream deadline: ", err)
Steven Allen's avatar
Steven Allen committed
249
			s.Reset()
250
251
252
253
			return
		}
	}

254
	s.SetProtocol(protocol.ID(protoID))
255
	log.Debugf("protocol negotiation took %s", took)
Jeromy's avatar
Jeromy committed
256

257
	go handle(protoID, s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
258
259
}

260
261
262
263
264
// PushIdentify pushes an identify update through the identify push protocol
func (h *BasicHost) PushIdentify() {
	h.ids.Push()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
265
266
267
268
269
270
// ID returns the (local) peer.ID associated with this Host
func (h *BasicHost) ID() peer.ID {
	return h.Network().LocalPeer()
}

// Peerstore returns the Host's repository of Peer Addresses and Keys.
Jeromy's avatar
Jeromy committed
271
func (h *BasicHost) Peerstore() pstore.Peerstore {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
272
273
274
	return h.Network().Peerstore()
}

275
// Network returns the Network interface of the Host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
276
277
278
279
280
func (h *BasicHost) Network() inet.Network {
	return h.network
}

// Mux returns the Mux multiplexing incoming streams to protocol handlers
Jeromy's avatar
Jeromy committed
281
func (h *BasicHost) Mux() *msmux.MultistreamMuxer {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
282
	return h.mux
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
283
284
}

285
// IDService returns
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
286
287
288
289
290
291
292
293
294
func (h *BasicHost) IDService() *identify.IDService {
	return h.ids
}

// SetStreamHandler sets the protocol handler on the Host's Mux.
// This is equivalent to:
//   host.Mux().SetHandler(proto, handler)
// (Threadsafe)
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) {
295
296
	h.Mux().AddHandler(string(pid), func(p string, rwc io.ReadWriteCloser) error {
		is := rwc.(inet.Stream)
297
		is.SetProtocol(protocol.ID(p))
298
		handler(is)
Jeromy's avatar
Jeromy committed
299
300
		return nil
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
301
302
}

303
304
305
306
307
// SetStreamHandlerMatch sets the protocol handler on the Host's Mux
// using a matching function to do protocol comparisons
func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler inet.StreamHandler) {
	h.Mux().AddHandlerWithFunc(string(pid), m, func(p string, rwc io.ReadWriteCloser) error {
		is := rwc.(inet.Stream)
308
		is.SetProtocol(protocol.ID(p))
309
310
311
312
313
		handler(is)
		return nil
	})
}

314
// RemoveStreamHandler returns ..
Jeromy's avatar
Jeromy committed
315
func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
Jeromy's avatar
Jeromy committed
316
	h.Mux().RemoveHandler(string(pid))
Jeromy's avatar
Jeromy committed
317
318
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
319
320
321
322
// NewStream opens a new stream to given peer p, and writes a p2p/protocol
// header with given protocol.ID. If there is no connection to p, attempts
// to create one. If ProtocolID is "", writes no header.
// (Threadsafe)
323
func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (inet.Stream, error) {
324
325
326
327
328
	pref, err := h.preferredProtocol(p, pids)
	if err != nil {
		return nil, err
	}

329
330
331
332
	if pref != "" {
		return h.newStream(ctx, p, pref)
	}

333
	var protoStrs []string
334
	for _, pid := range pids {
335
336
		protoStrs = append(protoStrs, string(pid))
	}
337

338
339
340
341
	s, err := h.Network().NewStream(ctx, p)
	if err != nil {
		return nil, err
	}
342

343
344
	selected, err := msmux.SelectOneOf(protoStrs, s)
	if err != nil {
Steven Allen's avatar
Steven Allen committed
345
		s.Reset()
346
		return nil, err
347
	}
348
349
	selpid := protocol.ID(selected)
	s.SetProtocol(selpid)
350
	h.Peerstore().AddProtocols(p, selected)
351

Jeromy's avatar
Jeromy committed
352
	return s, nil
353
354
}

355
356
357
358
func pidsToStrings(pids []protocol.ID) []string {
	out := make([]string, len(pids))
	for i, p := range pids {
		out[i] = string(p)
359
	}
360
	return out
361
362
}

363
364
365
366
367
func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.ID, error) {
	pidstrs := pidsToStrings(pids)
	supported, err := h.Peerstore().SupportsProtocols(p, pidstrs...)
	if err != nil {
		return "", err
368
369
	}

370
371
372
373
374
	var out protocol.ID
	if len(supported) > 0 {
		out = protocol.ID(supported[0])
	}
	return out, nil
375
376
377
}

func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) (inet.Stream, error) {
378
	s, err := h.Network().NewStream(ctx, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
379
380
381
382
	if err != nil {
		return nil, err
	}

383
	s.SetProtocol(pid)
384

Jeromy's avatar
Jeromy committed
385
	lzcon := msmux.NewMSSelect(s, string(pid))
Jeromy's avatar
Jeromy committed
386
	return &streamWrapper{
Jeromy's avatar
Jeromy committed
387
		Stream: s,
Jeromy's avatar
Jeromy committed
388
389
		rw:     lzcon,
	}, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
390
391
392
}

// Connect ensures there is a connection between this host and the peer with
393
394
395
396
// given peer.ID. If there is not an active connection, Connect will issue a
// h.Network.Dial, and block until a connection is open, or an error is returned.
// Connect will absorb the addresses in pi into its internal peerstore.
// It will also resolve any /dns4, /dns6, and /dnsaddr addresses.
Jeromy's avatar
Jeromy committed
397
func (h *BasicHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
398
	// absorb addresses into peerstore
Jeromy's avatar
Jeromy committed
399
	h.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.TempAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
400

401
	if h.Network().Connectedness(pi.ID) == inet.Connected {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
402
403
404
		return nil
	}

405
406
407
408
409
410
	resolved, err := h.resolveAddrs(ctx, h.Peerstore().PeerInfo(pi.ID))
	if err != nil {
		return err
	}
	h.Peerstore().AddAddrs(pi.ID, resolved, pstore.TempAddrTTL)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
411
412
413
	return h.dialPeer(ctx, pi.ID)
}

414
415
416
417
418
419
420
421
422
423
func (h *BasicHost) resolveAddrs(ctx context.Context, pi pstore.PeerInfo) ([]ma.Multiaddr, error) {
	proto := ma.ProtocolWithCode(ma.P_IPFS).Name
	p2paddr, err := ma.NewMultiaddr("/" + proto + "/" + pi.ID.Pretty())
	if err != nil {
		return nil, err
	}

	var addrs []ma.Multiaddr
	for _, addr := range pi.Addrs {
		addrs = append(addrs, addr)
424
425
426
		if !madns.Matches(addr) {
			continue
		}
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444

		reqaddr := addr.Encapsulate(p2paddr)
		resaddrs, err := h.maResolver.Resolve(ctx, reqaddr)
		if err != nil {
			log.Infof("error resolving %s: %s", reqaddr, err)
		}
		for _, res := range resaddrs {
			pi, err := pstore.InfoFromP2pAddr(res)
			if err != nil {
				log.Infof("error parsing %s: %s", res, err)
			}
			addrs = append(addrs, pi.Addrs...)
		}
	}

	return addrs, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
445
446
447
// dialPeer opens a connection to peer, and makes sure to identify
// the connection once it has been opened.
func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
448
	log.Debugf("host %s dialing %s", h.ID(), p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
449
450
451
452
453
	c, err := h.Network().DialPeer(ctx, p)
	if err != nil {
		return err
	}

454
455
456
457
	// Clear protocols on connecting to new peer to avoid issues caused
	// by misremembering protocols between reconnects
	h.Peerstore().SetProtocols(p)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
458
459
460
461
462
463
464
465
466
467
468
469
470
471
	// identify the connection before returning.
	done := make(chan struct{})
	go func() {
		h.ids.IdentifyConn(c)
		close(done)
	}()

	// respect don contexteone
	select {
	case <-done:
	case <-ctx.Done():
		return ctx.Err()
	}

Jeromy's avatar
Jeromy committed
472
	log.Debugf("host %s finished dialing %s", h.ID(), p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
473
474
475
	return nil
}

476
func (h *BasicHost) ConnManager() ifconnmgr.ConnManager {
Jeromy's avatar
Jeromy committed
477
478
479
	return h.cmgr
}

480
481
// Addrs returns listening addresses that are safe to announce to the network.
// The output is the same as AllAddrs, but processed by AddrsFactory.
482
func (h *BasicHost) Addrs() []ma.Multiaddr {
483
	return h.AddrsFactory(h.AllAddrs())
484
485
}

486
487
488
489
490
// mergeAddrs merges input address lists, leave only unique addresses
func mergeAddrs(addrLists ...[]ma.Multiaddr) (uniqueAddrs []ma.Multiaddr) {
	exists := make(map[string]bool)
	for _, addrList := range addrLists {
		for _, addr := range addrList {
Can ZHANG's avatar
Can ZHANG committed
491
492
			k := string(addr.Bytes())
			if exists[k] {
493
494
				continue
			}
Can ZHANG's avatar
Can ZHANG committed
495
			exists[k] = true
496
497
498
499
500
501
			uniqueAddrs = append(uniqueAddrs, addr)
		}
	}
	return uniqueAddrs
}

502
503
504
// AllAddrs returns all the addresses of BasicHost at this moment in time.
// It's ok to not include addresses if they're not available to be used now.
func (h *BasicHost) AllAddrs() []ma.Multiaddr {
505
	listenAddrs, err := h.Network().InterfaceListenAddresses()
506
507
508
	if err != nil {
		log.Debug("error retrieving network interface addrs")
	}
509
510
511
512
	var observedAddrs []ma.Multiaddr
	if h.ids != nil {
		// peer observed addresses
		observedAddrs = h.ids.OwnObservedAddrs()
513
	}
514
515
516
517
518
	var natAddrs []ma.Multiaddr
	// natmgr is nil if we do not use nat option;
	// h.natmgr.NAT() is nil if not ready, or no nat is available.
	if h.natmgr != nil && h.natmgr.NAT() != nil {
		natAddrs = h.natmgr.NAT().ExternalAddrs()
519
520
	}

521
	return mergeAddrs(listenAddrs, observedAddrs, natAddrs)
522
523
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
524
525
// Close shuts down the Host's services (network, etc).
func (h *BasicHost) Close() error {
526
	return h.proc.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
527
}
Jeromy's avatar
Jeromy committed
528

Jeromy's avatar
Jeromy committed
529
530
531
532
533
534
535
536
537
538
539
540
type streamWrapper struct {
	inet.Stream
	rw io.ReadWriter
}

func (s *streamWrapper) Read(b []byte) (int, error) {
	return s.rw.Read(b)
}

func (s *streamWrapper) Write(b []byte) (int, error) {
	return s.rw.Write(b)
}