routed.go 5.21 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package routedhost

import (
4
	"context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
6
7
	"fmt"
	"time"

Jeromy's avatar
Jeromy committed
8
	host "github.com/libp2p/go-libp2p-host"
Jeromy's avatar
Jeromy committed
9

Jeromy's avatar
Jeromy committed
10
	logging "github.com/ipfs/go-log"
11
	circuit "github.com/libp2p/go-libp2p-circuit"
12
	ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
Jeromy's avatar
Jeromy committed
13
	lgbl "github.com/libp2p/go-libp2p-loggables"
Jeromy's avatar
Jeromy committed
14
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
15
16
	peer "github.com/libp2p/go-libp2p-peer"
	pstore "github.com/libp2p/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
17
	protocol "github.com/libp2p/go-libp2p-protocol"
Jeromy's avatar
Jeromy committed
18
	ma "github.com/multiformats/go-multiaddr"
Jeromy's avatar
Jeromy committed
19
	msmux "github.com/multiformats/go-multistream"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
21
)

Jeromy's avatar
Jeromy committed
22
var log = logging.Logger("routedhost")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
23
24
25
26
27
28
29
30
31
32

// AddressTTL is the expiry time for our addresses.
// We expire them quickly.
const AddressTTL = time.Second * 10

// RoutedHost is a p2p Host that includes a routing system.
// This allows the Host to find the addresses for peers when
// it does not have them.
type RoutedHost struct {
	host  host.Host // embedded other host.
Jeromy's avatar
Jeromy committed
33
34
35
36
	route Routing
}

type Routing interface {
Jeromy's avatar
Jeromy committed
37
	FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
39
}

Jeromy's avatar
Jeromy committed
40
func Wrap(h host.Host, r Routing) *RoutedHost {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41
42
43
44
45
46
47
48
	return &RoutedHost{h, r}
}

// Connect ensures there is a connection between this host and the peer with
// given peer.ID. See (host.Host).Connect for more information.
//
// RoutedHost's Connect differs in that if the host has no addresses for a
// given peer, it will use its routing system to try to find some.
Jeromy's avatar
Jeromy committed
49
func (rh *RoutedHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50
	// first, check if we're already connected.
51
	if rh.Network().Connectedness(pi.ID) == inet.Connected {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52
53
54
55
56
		return nil
	}

	// if we were given some addresses, keep + use them.
	if len(pi.Addrs) > 0 {
Jeromy's avatar
Jeromy committed
57
		rh.Peerstore().AddAddrs(pi.ID, pi.Addrs, pstore.TempAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
58
59
60
61
62
63
	}

	// Check if we have some addresses in our recent memory.
	addrs := rh.Peerstore().Addrs(pi.ID)
	if len(addrs) < 1 {
		// no addrs? find some with the routing system.
64
65
		var err error
		addrs, err = rh.findPeerAddrs(ctx, pi.ID)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
66
67
68
69
70
		if err != nil {
			return err
		}
	}

71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
	// Issue 448: if our address set includes specific relay addrs, we need
	// to make sure the relay's addr itself is in the peerstore or else
	// we wont be able to dial it.
	for _, addr := range addrs {
		_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
		if err != nil {
			// not a relay address
			continue
		}

		relay, err := addr.ValueForProtocol(ma.P_P2P)
		if err != nil {
			// not a specific relay address
			continue
		}

		relayID, err := peer.IDFromString(relay)
		if err != nil {
			log.Debugf("failed to parse relay ID in address %s: %s", relay, err)
			continue
		}

vyzo's avatar
vyzo committed
93
94
95
96
97
		if relayID == pi.ID {
			// it's an old style p2p-circuit address that includes the peer
			continue
		}

98
99
100
101
102
103
104
105
106
107
108
109
110
111
		if len(rh.Peerstore().Addrs(relayID)) > 0 {
			// we already have addrs for this relay
			continue
		}

		relayAddrs, err := rh.findPeerAddrs(ctx, relayID)
		if err != nil {
			log.Debugf("failed to find relay %s: %s", relay, err)
			continue
		}

		rh.Peerstore().AddAddrs(relayID, relayAddrs, pstore.AddressTTL)
	}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
113
114
115
116
	// if we're here, we got some addrs. let's use our wrapped host to connect.
	pi.Addrs = addrs
	return rh.host.Connect(ctx, pi)
}

117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
func (rh *RoutedHost) findPeerAddrs(ctx context.Context, id peer.ID) ([]ma.Multiaddr, error) {
	pi, err := rh.route.FindPeer(ctx, id)
	if err != nil {
		return nil, err // couldnt find any :(
	}

	if pi.ID != id {
		err = fmt.Errorf("routing failure: provided addrs for different peer")
		logRoutingErrDifferentPeers(ctx, id, pi.ID, err)
		return nil, err
	}

	return pi.Addrs, nil
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
132
133
134
135
136
137
138
139
140
141
142
func logRoutingErrDifferentPeers(ctx context.Context, wanted, got peer.ID, err error) {
	lm := make(lgbl.DeferredMap)
	lm["error"] = err
	lm["wantedPeer"] = func() interface{} { return wanted.Pretty() }
	lm["gotPeer"] = func() interface{} { return got.Pretty() }
	log.Event(ctx, "routingError", lm)
}

func (rh *RoutedHost) ID() peer.ID {
	return rh.host.ID()
}
Jeromy's avatar
Jeromy committed
143

Jeromy's avatar
Jeromy committed
144
func (rh *RoutedHost) Peerstore() pstore.Peerstore {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
145
146
	return rh.host.Peerstore()
}
Jeromy's avatar
Jeromy committed
147

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
148
149
150
func (rh *RoutedHost) Addrs() []ma.Multiaddr {
	return rh.host.Addrs()
}
Jeromy's avatar
Jeromy committed
151

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
152
153
154
func (rh *RoutedHost) Network() inet.Network {
	return rh.host.Network()
}
Jeromy's avatar
Jeromy committed
155

Jeromy's avatar
Jeromy committed
156
func (rh *RoutedHost) Mux() *msmux.MultistreamMuxer {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
157
158
	return rh.host.Mux()
}
Jeromy's avatar
Jeromy committed
159

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
160
161
162
func (rh *RoutedHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) {
	rh.host.SetStreamHandler(pid, handler)
}
Jeromy's avatar
Jeromy committed
163

164
165
166
167
func (rh *RoutedHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler inet.StreamHandler) {
	rh.host.SetStreamHandlerMatch(pid, m, handler)
}

Jeromy's avatar
Jeromy committed
168
169
170
171
func (rh *RoutedHost) RemoveStreamHandler(pid protocol.ID) {
	rh.host.RemoveStreamHandler(pid)
}

172
func (rh *RoutedHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (inet.Stream, error) {
173
174
175
176
177
178
	// Ensure we have a connection, with peer addresses resolved by the routing system (#207)
	// It is not sufficient to let the underlying host connect, it will most likely not have
	// any addresses for the peer without any prior connections.
	err := rh.Connect(ctx, pstore.PeerInfo{ID: p})
	if err != nil {
		return nil, err
179
180
	}

181
	return rh.host.NewStream(ctx, p, pids...)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
182
183
184
185
186
}
func (rh *RoutedHost) Close() error {
	// no need to close IpfsRouting. we dont own it.
	return rh.host.Close()
}
187
func (rh *RoutedHost) ConnManager() ifconnmgr.ConnManager {
Jeromy's avatar
Jeromy committed
188
189
	return rh.host.ConnManager()
}
Jeromy's avatar
Jeromy committed
190

191
var _ (host.Host) = (*RoutedHost)(nil)