Commit a1240fdd authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub
Browse files

Merge pull request #184 from libp2p/feat/rm-relay

Remove relay protocol (and roadmap.md)
parents 51ad8945 5a36062c
......@@ -6,7 +6,6 @@ import (
"time"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
relay "github.com/libp2p/go-libp2p/p2p/protocol/relay"
logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
......@@ -40,13 +39,11 @@ const (
// particular host implementation:
// * uses a protocol muxer to mux per-protocol streams
// * uses an identity service to send + receive node information
// * uses a relay service to allow hosts to relay conns for each other
// * uses a nat service to establish NAT port mappings
type BasicHost struct {
network inet.Network
mux *msmux.MultistreamMuxer
ids *identify.IDService
relay *relay.RelayService
natmgr *natManager
NegotiateTimeout time.Duration
......@@ -75,12 +72,6 @@ func New(net inet.Network, opts ...interface{}) *BasicHost {
// setup host services
h.ids = identify.NewIDService(h)
muxh := h.Mux().Handle
handle := func(s inet.Stream) {
muxh(s)
}
h.relay = relay.NewRelayService(h, handle)
for _, o := range opts {
switch o := o.(type) {
case Option:
......@@ -300,7 +291,7 @@ func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) (
// given peer.ID. Connect will absorb the addresses in pi into its internal
// peerstore. If there is not an active connection, Connect will issue a
// h.Network.Dial, and block until a connection is open, or an error is
// returned. // TODO: Relay + NAT.
// returned.
func (h *BasicHost) Connect(ctx context.Context, pi pstore.PeerInfo) error {
// absorb addresses into peerstore
......
package relay
import (
"context"
"fmt"
"io"
"time"
host "github.com/libp2p/go-libp2p-host"
logging "github.com/ipfs/go-log"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
mh "github.com/multiformats/go-multihash"
)
var log = logging.Logger("protocol/relay")
// ID is the protocol.ID of the Relay Service.
const ID protocol.ID = "/ipfs/relay/line/0.1.0"
// Relay is a structure that implements ProtocolRelay.
// It is a simple relay service which forwards traffic
// between two directly connected peers.
//
// the protocol is very simple:
//
// /ipfs/relay\n
// <multihash src id>
// <multihash dst id>
// <data stream>
//
type RelayService struct {
host host.Host
handler inet.StreamHandler // for streams sent to us locally.
}
func NewRelayService(h host.Host, sh inet.StreamHandler) *RelayService {
s := &RelayService{
host: h,
handler: sh,
}
h.SetStreamHandler(ID, s.requestHandler)
return s
}
// requestHandler is the function called by clients
func (rs *RelayService) requestHandler(s inet.Stream) {
if err := rs.handleStream(s); err != nil {
log.Debugf("RelayService error:", err)
}
}
// handleStream is our own handler, which returns an error for simplicity.
func (rs *RelayService) handleStream(s inet.Stream) error {
defer s.Close()
// read the header (src and dst peer.IDs)
src, dst, err := ReadHeader(s)
if err != nil {
return fmt.Errorf("stream with bad header: %s", err)
}
local := rs.host.ID()
switch {
case src == local:
return fmt.Errorf("relaying from self")
case dst == local: // it's for us! yaaay.
log.Debugf("%s consuming stream from %s", local, src)
return rs.consumeStream(s)
default: // src and dst are not local. relay it.
log.Debugf("%s relaying stream %s <--> %s", local, src, dst)
return rs.pipeStream(src, dst, s)
}
}
// consumeStream connects streams directed to the local peer
// to our handler, with the header now stripped (read).
func (rs *RelayService) consumeStream(s inet.Stream) error {
rs.handler(s) // boom.
return nil
}
// pipeStream relays over a stream to a remote peer. It's like `cat`
func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
// TODO: find a good way to pass contexts into here
nsctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
s2, err := rs.openStreamToPeer(nsctx, dst)
if err != nil {
return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err)
}
cancel() // cancel here because this function might last a while
if err := WriteHeader(s2, src, dst); err != nil {
return err
}
// connect the series of tubes.
done := make(chan retio, 2)
go func() {
n, err := io.Copy(s2, s)
done <- retio{n, err}
}()
go func() {
n, err := io.Copy(s, s2)
done <- retio{n, err}
}()
r1 := <-done
r2 := <-done
log.Infof("%s relayed %d/%d bytes between %s and %s", rs.host.ID(), r1.n, r2.n, src, dst)
if r1.err != nil {
return r1.err
}
return r2.err
}
// openStreamToPeer opens a pipe to a remote endpoint
// for now, can only open streams to directly connected peers.
// maybe we can do some routing later on.
func (rs *RelayService) openStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) {
return rs.host.NewStream(ctx, p, ID)
}
func ReadHeader(r io.Reader) (src, dst peer.ID, err error) {
mhr := mh.NewReader(r)
s, err := mhr.ReadMultihash()
if err != nil {
return "", "", err
}
d, err := mhr.ReadMultihash()
if err != nil {
return "", "", err
}
return peer.ID(s), peer.ID(d), nil
}
func WriteHeader(w io.Writer, src, dst peer.ID) error {
// write header to w.
mhw := mh.NewWriter(w)
if err := mhw.WriteMultihash(mh.Multihash(src)); err != nil {
return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
}
if err := mhw.WriteMultihash(mh.Multihash(dst)); err != nil {
return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
}
return nil
}
type retio struct {
n int64
err error
}
package relay_test
import (
"context"
"io"
"testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
relay "github.com/libp2p/go-libp2p/p2p/protocol/relay"
logging "github.com/ipfs/go-log"
inet "github.com/libp2p/go-libp2p-net"
testutil "github.com/libp2p/go-libp2p-netutil"
protocol "github.com/libp2p/go-libp2p-protocol"
msmux "github.com/multiformats/go-multistream"
)
var log = logging.Logger("relay_test")
func TestRelaySimple(t *testing.T) {
ctx := context.Background()
// these networks have the relay service wired in already.
n1 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n2 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n3 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n1p := n1.ID()
n2p := n2.ID()
n3p := n3.ID()
n2pi := n2.Peerstore().PeerInfo(n2p)
if err := n1.Connect(ctx, n2pi); err != nil {
t.Fatal("Failed to connect:", err)
}
if err := n3.Connect(ctx, n2pi); err != nil {
t.Fatal("Failed to connect:", err)
}
// setup handler on n3 to copy everything over to the pipe.
piper, pipew := io.Pipe()
n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) {
log.Debug("relay stream opened to n3!")
log.Debug("piping and echoing everything")
w := io.MultiWriter(s, pipew)
io.Copy(w, s)
log.Debug("closing stream")
s.Close()
})
// ok, now we can try to relay n1--->n2--->n3.
log.Debug("open relay stream")
s, err := n1.NewStream(ctx, n2p, relay.ID)
if err != nil {
t.Fatal(err)
}
// ok first thing we write the relay header n1->n3
log.Debug("write relay header")
if err := relay.WriteHeader(s, n1p, n3p); err != nil {
t.Fatal(err)
}
// ok now the header's there, we can write the next protocol header.
log.Debug("write testing header")
if err := msmux.SelectProtoOrFail(string(protocol.TestingID), s); err != nil {
t.Fatal(err)
}
// okay, now we should be able to write text, and read it out.
buf1 := []byte("abcdefghij")
buf2 := make([]byte, 10)
buf3 := make([]byte, 10)
log.Debug("write in some text.")
if _, err := s.Write(buf1); err != nil {
t.Fatal(err)
}
// read it out from the pipe.
log.Debug("read it out from the pipe.")
if _, err := io.ReadFull(piper, buf2); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf2) {
t.Fatal("should've gotten that text out of the pipe")
}
// read it out from the stream (echoed)
log.Debug("read it out from the stream (echoed).")
if _, err := io.ReadFull(s, buf3); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf3) {
t.Fatal("should've gotten that text out of the stream")
}
// sweet. relay works.
log.Debug("sweet, relay works.")
s.Close()
}
func TestRelayAcrossFour(t *testing.T) {
ctx := context.Background()
// these networks have the relay service wired in already.
n1 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n2 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n3 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n4 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n5 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n1p := n1.ID()
n2p := n2.ID()
n3p := n3.ID()
n4p := n4.ID()
n5p := n5.ID()
n2pi := n2.Peerstore().PeerInfo(n2p)
n4pi := n4.Peerstore().PeerInfo(n4p)
if err := n1.Connect(ctx, n2pi); err != nil {
t.Fatalf("Failed to dial:", err)
}
if err := n3.Connect(ctx, n2pi); err != nil {
t.Fatalf("Failed to dial:", err)
}
if err := n3.Connect(ctx, n4pi); err != nil {
t.Fatalf("Failed to dial:", err)
}
if err := n5.Connect(ctx, n4pi); err != nil {
t.Fatalf("Failed to dial:", err)
}
// setup handler on n5 to copy everything over to the pipe.
piper, pipew := io.Pipe()
n5.SetStreamHandler(protocol.TestingID, func(s inet.Stream) {
log.Debug("relay stream opened to n5!")
log.Debug("piping and echoing everything")
w := io.MultiWriter(s, pipew)
io.Copy(w, s)
log.Debug("closing stream")
s.Close()
})
// ok, now we can try to relay n1--->n2--->n3--->n4--->n5
log.Debug("open relay stream")
s, err := n1.NewStream(ctx, n2p, relay.ID)
if err != nil {
t.Fatal(err)
}
log.Debugf("write relay header n1->n3 (%s -> %s)", n1p, n3p)
if err := relay.WriteHeader(s, n1p, n3p); err != nil {
t.Fatal(err)
}
log.Debugf("write relay header n1->n4 (%s -> %s)", n1p, n4p)
if err := msmux.SelectProtoOrFail(string(relay.ID), s); err != nil {
t.Fatal(err)
}
if err := relay.WriteHeader(s, n1p, n4p); err != nil {
t.Fatal(err)
}
log.Debugf("write relay header n1->n5 (%s -> %s)", n1p, n5p)
if err := msmux.SelectProtoOrFail(string(relay.ID), s); err != nil {
t.Fatal(err)
}
if err := relay.WriteHeader(s, n1p, n5p); err != nil {
t.Fatal(err)
}
// ok now the header's there, we can write the next protocol header.
log.Debug("write testing header")
if err := msmux.SelectProtoOrFail(string(protocol.TestingID), s); err != nil {
t.Fatal(err)
}
// okay, now we should be able to write text, and read it out.
buf1 := []byte("abcdefghij")
buf2 := make([]byte, 10)
buf3 := make([]byte, 10)
log.Debug("write in some text.")
if _, err := s.Write(buf1); err != nil {
t.Fatal(err)
}
// read it out from the pipe.
log.Debug("read it out from the pipe.")
if _, err := io.ReadFull(piper, buf2); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf2) {
t.Fatal("should've gotten that text out of the pipe")
}
// read it out from the stream (echoed)
log.Debug("read it out from the stream (echoed).")
if _, err := io.ReadFull(s, buf3); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf3) {
t.Fatal("should've gotten that text out of the stream")
}
// sweet. relay works.
log.Debug("sweet, relaying across 4 works.")
s.Close()
}
func TestRelayStress(t *testing.T) {
buflen := 1 << 18
iterations := 10
ctx := context.Background()
// these networks have the relay service wired in already.
n1 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n2 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n3 := bhost.New(testutil.GenSwarmNetwork(t, ctx))
n1p := n1.ID()
n2p := n2.ID()
n3p := n3.ID()
n2pi := n2.Peerstore().PeerInfo(n2p)
if err := n1.Connect(ctx, n2pi); err != nil {
t.Fatalf("Failed to dial:", err)
}
if err := n3.Connect(ctx, n2pi); err != nil {
t.Fatalf("Failed to dial:", err)
}
// setup handler on n3 to copy everything over to the pipe.
piper, pipew := io.Pipe()
n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) {
log.Debug("relay stream opened to n3!")
log.Debug("piping and echoing everything")
w := io.MultiWriter(s, pipew)
io.Copy(w, s)
log.Debug("closing stream")
s.Close()
})
// ok, now we can try to relay n1--->n2--->n3.
log.Debug("open relay stream")
s, err := n1.NewStream(ctx, n2p, relay.ID)
if err != nil {
t.Fatal(err)
}
// ok first thing we write the relay header n1->n3
log.Debug("write relay header")
if err := relay.WriteHeader(s, n1p, n3p); err != nil {
t.Fatal(err)
}
// ok now the header's there, we can write the next protocol header.
log.Debug("write testing header")
if err := msmux.SelectProtoOrFail(string(protocol.TestingID), s); err != nil {
t.Fatal(err)
}
// okay, now write lots of text and read it back out from both
// the pipe and the stream.
buf1 := make([]byte, buflen)
buf2 := make([]byte, len(buf1))
buf3 := make([]byte, len(buf1))
fillbuf := func(buf []byte, b byte) {
for i := range buf {
buf[i] = b
}
}
for i := 0; i < iterations; i++ {
fillbuf(buf1, byte(int('a')+i))
log.Debugf("writing %d bytes (%d/%d)", len(buf1), i, iterations)
if _, err := s.Write(buf1); err != nil {
t.Fatal(err)
}
log.Debug("read it out from the pipe.")
if _, err := io.ReadFull(piper, buf2); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf2) {
t.Fatal("should've gotten that text out of the pipe")
}
// read it out from the stream (echoed)
log.Debug("read it out from the stream (echoed).")
if _, err := io.ReadFull(s, buf3); err != nil {
t.Fatal(err)
}
if string(buf1) != string(buf3) {
t.Fatal("should've gotten that text out of the stream")
}
}
log.Debug("sweet, relay works under stress.")
s.Close()
}
# go-libp2p Q2 roadmap
- [ ] websockets transport to communicate with js-ipfs
- [ ] line switching for libp2p (relay)
- [ ] add config opt-in for relaying traffic
- [ ] add 'find indirect connections' logic to dht
- [ ] Pubsub
- [ ] ship initial implementation to libp2p
- [ ] ipfs 'flare' in go-ipfs
- [ ] NAT Traversal
- [ ] Goal: No "I can't connect to my other node" issues for two weeks
- [ ] address discovery issue ipfs/#2509 ipfs/#2413
- [ ] testbed to simulate NAT scenarios
- [ ] command to clear dial backoff ipfs/#2456
- [ ] libp2p connection closing
- [ ] figure out when we want to close connections
- [ ] select 'good' number of connections while idle (based on available resources?)
- [ ] fix mocknet issues #31 #32
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment