diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 0f662cd9338050b5f8efe34db0ccdfa617684d04..86c276cb5b6a3d8fddcf284f881a1d5b3706637a 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -6,7 +6,6 @@ import ( "time" identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" - relay "github.com/libp2p/go-libp2p/p2p/protocol/relay" logging "github.com/ipfs/go-log" goprocess "github.com/jbenet/goprocess" @@ -40,13 +39,11 @@ const ( // particular host implementation: // * uses a protocol muxer to mux per-protocol streams // * uses an identity service to send + receive node information -// * uses a relay service to allow hosts to relay conns for each other // * uses a nat service to establish NAT port mappings type BasicHost struct { network inet.Network mux *msmux.MultistreamMuxer ids *identify.IDService - relay *relay.RelayService natmgr *natManager NegotiateTimeout time.Duration @@ -75,12 +72,6 @@ func New(net inet.Network, opts ...interface{}) *BasicHost { // setup host services h.ids = identify.NewIDService(h) - muxh := h.Mux().Handle - handle := func(s inet.Stream) { - muxh(s) - } - h.relay = relay.NewRelayService(h, handle) - for _, o := range opts { switch o := o.(type) { case Option: @@ -300,7 +291,7 @@ func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) ( // given peer.ID. Connect will absorb the addresses in pi into its internal // peerstore. If there is not an active connection, Connect will issue a // h.Network.Dial, and block until a connection is open, or an error is -// returned. // TODO: Relay + NAT. +// returned. func (h *BasicHost) Connect(ctx context.Context, pi pstore.PeerInfo) error { // absorb addresses into peerstore diff --git a/p2p/protocol/relay/relay.go b/p2p/protocol/relay/relay.go deleted file mode 100644 index 1256ff9343f87268c3515c96d7e5be64ea4e3658..0000000000000000000000000000000000000000 --- a/p2p/protocol/relay/relay.go +++ /dev/null @@ -1,163 +0,0 @@ -package relay - -import ( - "context" - "fmt" - "io" - "time" - - host "github.com/libp2p/go-libp2p-host" - - logging "github.com/ipfs/go-log" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - protocol "github.com/libp2p/go-libp2p-protocol" - mh "github.com/multiformats/go-multihash" -) - -var log = logging.Logger("protocol/relay") - -// ID is the protocol.ID of the Relay Service. -const ID protocol.ID = "/ipfs/relay/line/0.1.0" - -// Relay is a structure that implements ProtocolRelay. -// It is a simple relay service which forwards traffic -// between two directly connected peers. -// -// the protocol is very simple: -// -// /ipfs/relay\n -// -// -// -// -type RelayService struct { - host host.Host - handler inet.StreamHandler // for streams sent to us locally. -} - -func NewRelayService(h host.Host, sh inet.StreamHandler) *RelayService { - s := &RelayService{ - host: h, - handler: sh, - } - h.SetStreamHandler(ID, s.requestHandler) - return s -} - -// requestHandler is the function called by clients -func (rs *RelayService) requestHandler(s inet.Stream) { - if err := rs.handleStream(s); err != nil { - log.Debugf("RelayService error:", err) - } -} - -// handleStream is our own handler, which returns an error for simplicity. -func (rs *RelayService) handleStream(s inet.Stream) error { - defer s.Close() - - // read the header (src and dst peer.IDs) - src, dst, err := ReadHeader(s) - if err != nil { - return fmt.Errorf("stream with bad header: %s", err) - } - - local := rs.host.ID() - - switch { - case src == local: - return fmt.Errorf("relaying from self") - case dst == local: // it's for us! yaaay. - log.Debugf("%s consuming stream from %s", local, src) - return rs.consumeStream(s) - default: // src and dst are not local. relay it. - log.Debugf("%s relaying stream %s <--> %s", local, src, dst) - return rs.pipeStream(src, dst, s) - } -} - -// consumeStream connects streams directed to the local peer -// to our handler, with the header now stripped (read). -func (rs *RelayService) consumeStream(s inet.Stream) error { - rs.handler(s) // boom. - return nil -} - -// pipeStream relays over a stream to a remote peer. It's like `cat` -func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error { - // TODO: find a good way to pass contexts into here - nsctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) - defer cancel() - - s2, err := rs.openStreamToPeer(nsctx, dst) - if err != nil { - return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err) - } - cancel() // cancel here because this function might last a while - - if err := WriteHeader(s2, src, dst); err != nil { - return err - } - - // connect the series of tubes. - done := make(chan retio, 2) - go func() { - n, err := io.Copy(s2, s) - done <- retio{n, err} - }() - go func() { - n, err := io.Copy(s, s2) - done <- retio{n, err} - }() - - r1 := <-done - r2 := <-done - log.Infof("%s relayed %d/%d bytes between %s and %s", rs.host.ID(), r1.n, r2.n, src, dst) - - if r1.err != nil { - return r1.err - } - return r2.err -} - -// openStreamToPeer opens a pipe to a remote endpoint -// for now, can only open streams to directly connected peers. -// maybe we can do some routing later on. -func (rs *RelayService) openStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) { - return rs.host.NewStream(ctx, p, ID) -} - -func ReadHeader(r io.Reader) (src, dst peer.ID, err error) { - - mhr := mh.NewReader(r) - - s, err := mhr.ReadMultihash() - if err != nil { - return "", "", err - } - - d, err := mhr.ReadMultihash() - if err != nil { - return "", "", err - } - - return peer.ID(s), peer.ID(d), nil -} - -func WriteHeader(w io.Writer, src, dst peer.ID) error { - // write header to w. - mhw := mh.NewWriter(w) - if err := mhw.WriteMultihash(mh.Multihash(src)); err != nil { - return fmt.Errorf("failed to write relay header: %s -- %s", dst, err) - } - if err := mhw.WriteMultihash(mh.Multihash(dst)); err != nil { - return fmt.Errorf("failed to write relay header: %s -- %s", dst, err) - } - - return nil -} - -type retio struct { - n int64 - err error -} diff --git a/p2p/protocol/relay/relay_test.go b/p2p/protocol/relay/relay_test.go deleted file mode 100644 index d1df7c20d46f04b194b929340ddcc95f8a1d5811..0000000000000000000000000000000000000000 --- a/p2p/protocol/relay/relay_test.go +++ /dev/null @@ -1,305 +0,0 @@ -package relay_test - -import ( - "context" - "io" - "testing" - - bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - relay "github.com/libp2p/go-libp2p/p2p/protocol/relay" - - logging "github.com/ipfs/go-log" - inet "github.com/libp2p/go-libp2p-net" - testutil "github.com/libp2p/go-libp2p-netutil" - protocol "github.com/libp2p/go-libp2p-protocol" - msmux "github.com/multiformats/go-multistream" -) - -var log = logging.Logger("relay_test") - -func TestRelaySimple(t *testing.T) { - - ctx := context.Background() - - // these networks have the relay service wired in already. - n1 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n2 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n3 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - - n1p := n1.ID() - n2p := n2.ID() - n3p := n3.ID() - - n2pi := n2.Peerstore().PeerInfo(n2p) - if err := n1.Connect(ctx, n2pi); err != nil { - t.Fatal("Failed to connect:", err) - } - if err := n3.Connect(ctx, n2pi); err != nil { - t.Fatal("Failed to connect:", err) - } - - // setup handler on n3 to copy everything over to the pipe. - piper, pipew := io.Pipe() - n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) { - log.Debug("relay stream opened to n3!") - log.Debug("piping and echoing everything") - w := io.MultiWriter(s, pipew) - io.Copy(w, s) - log.Debug("closing stream") - s.Close() - }) - - // ok, now we can try to relay n1--->n2--->n3. - log.Debug("open relay stream") - s, err := n1.NewStream(ctx, n2p, relay.ID) - if err != nil { - t.Fatal(err) - } - - // ok first thing we write the relay header n1->n3 - log.Debug("write relay header") - if err := relay.WriteHeader(s, n1p, n3p); err != nil { - t.Fatal(err) - } - - // ok now the header's there, we can write the next protocol header. - log.Debug("write testing header") - if err := msmux.SelectProtoOrFail(string(protocol.TestingID), s); err != nil { - t.Fatal(err) - } - - // okay, now we should be able to write text, and read it out. - buf1 := []byte("abcdefghij") - buf2 := make([]byte, 10) - buf3 := make([]byte, 10) - log.Debug("write in some text.") - if _, err := s.Write(buf1); err != nil { - t.Fatal(err) - } - - // read it out from the pipe. - log.Debug("read it out from the pipe.") - if _, err := io.ReadFull(piper, buf2); err != nil { - t.Fatal(err) - } - if string(buf1) != string(buf2) { - t.Fatal("should've gotten that text out of the pipe") - } - - // read it out from the stream (echoed) - log.Debug("read it out from the stream (echoed).") - if _, err := io.ReadFull(s, buf3); err != nil { - t.Fatal(err) - } - if string(buf1) != string(buf3) { - t.Fatal("should've gotten that text out of the stream") - } - - // sweet. relay works. - log.Debug("sweet, relay works.") - s.Close() -} - -func TestRelayAcrossFour(t *testing.T) { - - ctx := context.Background() - - // these networks have the relay service wired in already. - n1 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n2 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n3 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n4 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n5 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - - n1p := n1.ID() - n2p := n2.ID() - n3p := n3.ID() - n4p := n4.ID() - n5p := n5.ID() - - n2pi := n2.Peerstore().PeerInfo(n2p) - n4pi := n4.Peerstore().PeerInfo(n4p) - - if err := n1.Connect(ctx, n2pi); err != nil { - t.Fatalf("Failed to dial:", err) - } - if err := n3.Connect(ctx, n2pi); err != nil { - t.Fatalf("Failed to dial:", err) - } - if err := n3.Connect(ctx, n4pi); err != nil { - t.Fatalf("Failed to dial:", err) - } - if err := n5.Connect(ctx, n4pi); err != nil { - t.Fatalf("Failed to dial:", err) - } - - // setup handler on n5 to copy everything over to the pipe. - piper, pipew := io.Pipe() - n5.SetStreamHandler(protocol.TestingID, func(s inet.Stream) { - log.Debug("relay stream opened to n5!") - log.Debug("piping and echoing everything") - w := io.MultiWriter(s, pipew) - io.Copy(w, s) - log.Debug("closing stream") - s.Close() - }) - - // ok, now we can try to relay n1--->n2--->n3--->n4--->n5 - log.Debug("open relay stream") - s, err := n1.NewStream(ctx, n2p, relay.ID) - if err != nil { - t.Fatal(err) - } - - log.Debugf("write relay header n1->n3 (%s -> %s)", n1p, n3p) - if err := relay.WriteHeader(s, n1p, n3p); err != nil { - t.Fatal(err) - } - - log.Debugf("write relay header n1->n4 (%s -> %s)", n1p, n4p) - if err := msmux.SelectProtoOrFail(string(relay.ID), s); err != nil { - t.Fatal(err) - } - if err := relay.WriteHeader(s, n1p, n4p); err != nil { - t.Fatal(err) - } - - log.Debugf("write relay header n1->n5 (%s -> %s)", n1p, n5p) - if err := msmux.SelectProtoOrFail(string(relay.ID), s); err != nil { - t.Fatal(err) - } - if err := relay.WriteHeader(s, n1p, n5p); err != nil { - t.Fatal(err) - } - - // ok now the header's there, we can write the next protocol header. - log.Debug("write testing header") - if err := msmux.SelectProtoOrFail(string(protocol.TestingID), s); err != nil { - t.Fatal(err) - } - - // okay, now we should be able to write text, and read it out. - buf1 := []byte("abcdefghij") - buf2 := make([]byte, 10) - buf3 := make([]byte, 10) - log.Debug("write in some text.") - if _, err := s.Write(buf1); err != nil { - t.Fatal(err) - } - - // read it out from the pipe. - log.Debug("read it out from the pipe.") - if _, err := io.ReadFull(piper, buf2); err != nil { - t.Fatal(err) - } - if string(buf1) != string(buf2) { - t.Fatal("should've gotten that text out of the pipe") - } - - // read it out from the stream (echoed) - log.Debug("read it out from the stream (echoed).") - if _, err := io.ReadFull(s, buf3); err != nil { - t.Fatal(err) - } - if string(buf1) != string(buf3) { - t.Fatal("should've gotten that text out of the stream") - } - - // sweet. relay works. - log.Debug("sweet, relaying across 4 works.") - s.Close() -} - -func TestRelayStress(t *testing.T) { - buflen := 1 << 18 - iterations := 10 - - ctx := context.Background() - - // these networks have the relay service wired in already. - n1 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n2 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - n3 := bhost.New(testutil.GenSwarmNetwork(t, ctx)) - - n1p := n1.ID() - n2p := n2.ID() - n3p := n3.ID() - - n2pi := n2.Peerstore().PeerInfo(n2p) - if err := n1.Connect(ctx, n2pi); err != nil { - t.Fatalf("Failed to dial:", err) - } - if err := n3.Connect(ctx, n2pi); err != nil { - t.Fatalf("Failed to dial:", err) - } - - // setup handler on n3 to copy everything over to the pipe. - piper, pipew := io.Pipe() - n3.SetStreamHandler(protocol.TestingID, func(s inet.Stream) { - log.Debug("relay stream opened to n3!") - log.Debug("piping and echoing everything") - w := io.MultiWriter(s, pipew) - io.Copy(w, s) - log.Debug("closing stream") - s.Close() - }) - - // ok, now we can try to relay n1--->n2--->n3. - log.Debug("open relay stream") - s, err := n1.NewStream(ctx, n2p, relay.ID) - if err != nil { - t.Fatal(err) - } - - // ok first thing we write the relay header n1->n3 - log.Debug("write relay header") - if err := relay.WriteHeader(s, n1p, n3p); err != nil { - t.Fatal(err) - } - - // ok now the header's there, we can write the next protocol header. - log.Debug("write testing header") - if err := msmux.SelectProtoOrFail(string(protocol.TestingID), s); err != nil { - t.Fatal(err) - } - - // okay, now write lots of text and read it back out from both - // the pipe and the stream. - buf1 := make([]byte, buflen) - buf2 := make([]byte, len(buf1)) - buf3 := make([]byte, len(buf1)) - - fillbuf := func(buf []byte, b byte) { - for i := range buf { - buf[i] = b - } - } - - for i := 0; i < iterations; i++ { - fillbuf(buf1, byte(int('a')+i)) - log.Debugf("writing %d bytes (%d/%d)", len(buf1), i, iterations) - if _, err := s.Write(buf1); err != nil { - t.Fatal(err) - } - - log.Debug("read it out from the pipe.") - if _, err := io.ReadFull(piper, buf2); err != nil { - t.Fatal(err) - } - if string(buf1) != string(buf2) { - t.Fatal("should've gotten that text out of the pipe") - } - - // read it out from the stream (echoed) - log.Debug("read it out from the stream (echoed).") - if _, err := io.ReadFull(s, buf3); err != nil { - t.Fatal(err) - } - if string(buf1) != string(buf3) { - t.Fatal("should've gotten that text out of the stream") - } - } - - log.Debug("sweet, relay works under stress.") - s.Close() -}