basic_host.go 9.77 KB
Newer Older
1
// Package basichost provides a basic libp2p Host implementation.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2
3
4
package basichost

import (
5
	"context"
6
	"io"
7
	"time"
8

Jeromy's avatar
Jeromy committed
9
10
	identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"

Jeromy's avatar
Jeromy committed
11
12
	logging "github.com/ipfs/go-log"
	goprocess "github.com/jbenet/goprocess"
Jeromy's avatar
Jeromy committed
13
14
15
	metrics "github.com/libp2p/go-libp2p-metrics"
	mstream "github.com/libp2p/go-libp2p-metrics/stream"
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
16
17
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
18
	protocol "github.com/libp2p/go-libp2p-protocol"
Jeromy's avatar
Jeromy committed
19
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
20
	msmux "github.com/multiformats/go-multistream"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
22
)

23
var log = logging.Logger("basichost")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
24

25
26
// NegotiateTimeout sets a deadline to perform stream handling
// negotiation (exchange of protocol IDs)
27
28
var NegotiateTimeout = time.Second * 60

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
29
// Option is a type used to pass in options to the host.
30
31
32
type Option int

const (
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33
34
35
36
37
	// NATPortMap makes the host attempt to open port-mapping in NAT devices
	// for all its listeners. Pass in this option in the constructor to
	// asynchronously a) find a gateway, b) open port mappings, c) republish
	// port mappings periodically. The NATed addresses are included in the
	// Host's Addrs() list.
38
39
40
	NATPortMap Option = iota
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41
42
43
44
45
// BasicHost is the basic implementation of the host.Host interface. This
// particular host implementation:
//  * uses a protocol muxer to mux per-protocol streams
//  * uses an identity service to send + receive node information
//  * uses a nat service to establish NAT port mappings
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46
47
type BasicHost struct {
	network inet.Network
Jeromy's avatar
Jeromy committed
48
	mux     *msmux.MultistreamMuxer
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
49
	ids     *identify.IDService
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50
	natmgr  *natManager
51

52
53
	NegotiateTimeout time.Duration

54
	proc goprocess.Process
Jeromy's avatar
Jeromy committed
55
56

	bwc metrics.Reporter
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
57
58
}

59
60
61
// New constructs and sets up a new *BasicHost with given Network. It can take
// additional options. Currently NATPortMap (see "Constants"
// documentation) and a custom metrics.Reporter are supported.
Jeromy's avatar
Jeromy committed
62
func New(net inet.Network, opts ...interface{}) *BasicHost {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
	h := &BasicHost{
64
65
66
		network:          net,
		mux:              msmux.NewMultistreamMuxer(),
		NegotiateTimeout: NegotiateTimeout,
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
67
68
	}

69
	h.proc = goprocess.WithTeardown(func() error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
71
72
73
		if h.natmgr != nil {
			h.natmgr.Close()
		}

74
75
76
		return h.Network().Close()
	})

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
78
	// setup host services
	h.ids = identify.NewIDService(h)
Jeromy's avatar
Jeromy committed
79

80
	for _, o := range opts {
Jeromy's avatar
Jeromy committed
81
82
83
84
85
86
87
88
		switch o := o.(type) {
		case Option:
			switch o {
			case NATPortMap:
				h.natmgr = newNatManager(h)
			}
		case metrics.Reporter:
			h.bwc = o
89
90
91
		}
	}

Jeromy's avatar
Jeromy committed
92
93
	h.ids.Reporter = h.bwc

Jeromy's avatar
Jeromy committed
94
95
96
	net.SetConnHandler(h.newConnHandler)
	net.SetStreamHandler(h.newStreamHandler)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
97
98
99
100
101
	return h
}

// newConnHandler is the remote-opened conn handler for inet.Network
func (h *BasicHost) newConnHandler(c inet.Conn) {
102
103
104
	// Clear protocols on connecting to new peer to avoid issues caused
	// by misremembering protocols between reconnects
	h.Peerstore().SetProtocols(c.RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
105
106
107
108
	h.ids.IdentifyConn(c)
}

// newStreamHandler is the remote-opened stream handler for inet.Network
Jeromy's avatar
Jeromy committed
109
// TODO: this feels a bit wonky
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
func (h *BasicHost) newStreamHandler(s inet.Stream) {
111
	before := time.Now()
112
113
114
115
116
117
118
119
120

	if h.NegotiateTimeout != 0 {
		if err := s.SetDeadline(time.Now().Add(h.NegotiateTimeout)); err != nil {
			log.Error("setting stream deadline: ", err)
			s.Close()
			return
		}
	}

121
	lzc, protoID, handle, err := h.Mux().NegotiateLazy(s)
122
	took := time.Now().Sub(before)
Jeromy's avatar
Jeromy committed
123
	if err != nil {
124
		if err == io.EOF {
125
126
127
128
129
			logf := log.Debugf
			if took > time.Second*10 {
				logf = log.Warningf
			}
			logf("protocol EOF: %s (took %s)", s.Conn().RemotePeer(), took)
130
		} else {
131
			log.Warning("protocol mux failed: %s (took %s)", err, took)
132
		}
133
		s.Close()
Jeromy's avatar
Jeromy committed
134
135
		return
	}
136

137
138
139
140
141
	s = &streamWrapper{
		Stream: s,
		rw:     lzc,
	}

142
143
144
145
146
147
148
149
	if h.NegotiateTimeout != 0 {
		if err := s.SetDeadline(time.Time{}); err != nil {
			log.Error("resetting stream deadline: ", err)
			s.Close()
			return
		}
	}

150
	s.SetProtocol(protocol.ID(protoID))
Jeromy's avatar
Jeromy committed
151

152
153
154
155
	if h.bwc != nil {
		s = mstream.WrapStream(s, h.bwc)
	}
	log.Debugf("protocol negotiation took %s", took)
Jeromy's avatar
Jeromy committed
156

157
	go handle(protoID, s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
159
160
161
162
163
164
165
}

// ID returns the (local) peer.ID associated with this Host
func (h *BasicHost) ID() peer.ID {
	return h.Network().LocalPeer()
}

// Peerstore returns the Host's repository of Peer Addresses and Keys.
Jeromy's avatar
Jeromy committed
166
func (h *BasicHost) Peerstore() pstore.Peerstore {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
168
169
	return h.Network().Peerstore()
}

170
// Network returns the Network interface of the Host
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
171
172
173
174
func (h *BasicHost) Network() inet.Network {
	return h.network
}

175
// Mux returns the Mux multiplexing incoming streams to protocol handlers.
Jeromy's avatar
Jeromy committed
176
func (h *BasicHost) Mux() *msmux.MultistreamMuxer {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
177
	return h.mux
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
178
179
}

180
// IDService returns the IDService for this host.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
182
183
184
185
186
187
188
189
func (h *BasicHost) IDService() *identify.IDService {
	return h.ids
}

// SetStreamHandler sets the protocol handler on the Host's Mux.
// This is equivalent to:
//   host.Mux().SetHandler(proto, handler)
// (Threadsafe)
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) {
190
191
	h.Mux().AddHandler(string(pid), func(p string, rwc io.ReadWriteCloser) error {
		is := rwc.(inet.Stream)
192
		is.SetProtocol(protocol.ID(p))
193
		handler(is)
Jeromy's avatar
Jeromy committed
194
195
		return nil
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
196
197
}

198
199
200
201
202
// SetStreamHandlerMatch sets the protocol handler on the Host's Mux
// using a matching function to do protocol comparisons
func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler inet.StreamHandler) {
	h.Mux().AddHandlerWithFunc(string(pid), m, func(p string, rwc io.ReadWriteCloser) error {
		is := rwc.(inet.Stream)
203
		is.SetProtocol(protocol.ID(p))
204
205
206
207
208
		handler(is)
		return nil
	})
}

209
// RemoveStreamHandler removes the handler matching the given protocol ID.
Jeromy's avatar
Jeromy committed
210
func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
Jeromy's avatar
Jeromy committed
211
	h.Mux().RemoveHandler(string(pid))
Jeromy's avatar
Jeromy committed
212
213
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
214
215
216
217
// NewStream opens a new stream to given peer p, and writes a p2p/protocol
// header with given protocol.ID. If there is no connection to p, attempts
// to create one. If ProtocolID is "", writes no header.
// (Threadsafe)
218
func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (inet.Stream, error) {
219
220
221
222
223
	pref, err := h.preferredProtocol(p, pids)
	if err != nil {
		return nil, err
	}

224
225
226
227
	if pref != "" {
		return h.newStream(ctx, p, pref)
	}

228
	var protoStrs []string
229
	for _, pid := range pids {
230
231
		protoStrs = append(protoStrs, string(pid))
	}
232

233
234
235
236
	s, err := h.Network().NewStream(ctx, p)
	if err != nil {
		return nil, err
	}
237

238
239
240
241
	selected, err := msmux.SelectOneOf(protoStrs, s)
	if err != nil {
		s.Close()
		return nil, err
242
	}
243
244
	selpid := protocol.ID(selected)
	s.SetProtocol(selpid)
245
	h.Peerstore().AddProtocols(p, selected)
246

Jeromy's avatar
Jeromy committed
247
248
249
250
251
	if h.bwc != nil {
		s = mstream.WrapStream(s, h.bwc)
	}

	return s, nil
252
253
}

254
255
256
257
func pidsToStrings(pids []protocol.ID) []string {
	out := make([]string, len(pids))
	for i, p := range pids {
		out[i] = string(p)
258
	}
259
	return out
260
261
}

262
263
264
265
266
func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.ID, error) {
	pidstrs := pidsToStrings(pids)
	supported, err := h.Peerstore().SupportsProtocols(p, pidstrs...)
	if err != nil {
		return "", err
267
268
	}

269
270
271
272
273
	var out protocol.ID
	if len(supported) > 0 {
		out = protocol.ID(supported[0])
	}
	return out, nil
274
275
276
}

func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) (inet.Stream, error) {
277
	s, err := h.Network().NewStream(ctx, p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
278
279
280
281
	if err != nil {
		return nil, err
	}

282
	s.SetProtocol(pid)
283

Jeromy's avatar
Jeromy committed
284
285
286
	if h.bwc != nil {
		s = mstream.WrapStream(s, h.bwc)
	}
Jeromy's avatar
Jeromy committed
287

Jeromy's avatar
Jeromy committed
288
	lzcon := msmux.NewMSSelect(s, string(pid))
Jeromy's avatar
Jeromy committed
289
	return &streamWrapper{
Jeromy's avatar
Jeromy committed
290
		Stream: s,
Jeromy's avatar
Jeromy committed
291
292
		rw:     lzcon,
	}, nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
293
294
295
296
297
298
}

// Connect ensures there is a connection between this host and the peer with
// given peer.ID. Connect will absorb the addresses in pi into its internal
// peerstore. If there is not an active connection, Connect will issue a
// h.Network.Dial, and block until a connection is open, or an error is
299
// returned.
Jeromy's avatar
Jeromy committed
300
func (h *BasicHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
301
302

	// absorb addresses into peerstore
Jeromy's avatar
Jeromy committed
303
	h.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.TempAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321

	cs := h.Network().ConnsToPeer(pi.ID)
	if len(cs) > 0 {
		return nil
	}

	return h.dialPeer(ctx, pi.ID)
}

// dialPeer opens a connection to peer, and makes sure to identify
// the connection once it has been opened.
func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
	log.Debugf("host %s dialing %s", h.ID, p)
	c, err := h.Network().DialPeer(ctx, p)
	if err != nil {
		return err
	}

322
323
324
325
	// Clear protocols on connecting to new peer to avoid issues caused
	// by misremembering protocols between reconnects
	h.Peerstore().SetProtocols(p)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
326
327
328
329
330
331
332
333
334
335
336
337
338
339
	// identify the connection before returning.
	done := make(chan struct{})
	go func() {
		h.ids.IdentifyConn(c)
		close(done)
	}()

	// respect don contexteone
	select {
	case <-done:
	case <-ctx.Done():
		return ctx.Err()
	}

Jeromy's avatar
Jeromy committed
340
	log.Debugf("host %s finished dialing %s", h.ID(), p)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
341
342
343
	return nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
344
345
// Addrs returns all the addresses of BasicHost at this moment in time.
// It's ok to not include addresses if they're not available to be used now.
346
347
348
349
350
351
func (h *BasicHost) Addrs() []ma.Multiaddr {
	addrs, err := h.Network().InterfaceListenAddresses()
	if err != nil {
		log.Debug("error retrieving network interface addrs")
	}

352
353
354
355
	if h.ids != nil { // add external observed addresses
		addrs = append(addrs, h.ids.OwnObservedAddrs()...)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
356
357
358
359
360
	if h.natmgr != nil { // natmgr is nil if we do not use nat option.
		nat := h.natmgr.NAT()
		if nat != nil { // nat is nil if not ready, or no nat is available.
			addrs = append(addrs, nat.ExternalAddrs()...)
		}
361
362
363
364
365
	}

	return addrs
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
366
367
// Close shuts down the Host's services (network, etc).
func (h *BasicHost) Close() error {
368
	return h.proc.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
369
}
Jeromy's avatar
Jeromy committed
370

371
// GetBandwidthReporter exposes the Host's bandiwth metrics reporter
Jeromy's avatar
Jeromy committed
372
373
374
func (h *BasicHost) GetBandwidthReporter() metrics.Reporter {
	return h.bwc
}
Jeromy's avatar
Jeromy committed
375
376
377
378
379
380
381
382
383
384
385
386
387

type streamWrapper struct {
	inet.Stream
	rw io.ReadWriter
}

func (s *streamWrapper) Read(b []byte) (int, error) {
	return s.rw.Read(b)
}

func (s *streamWrapper) Write(b []byte) (int, error) {
	return s.rw.Write(b)
}