From 1b84a1cc9d22dbe3fb2a43c3474fca9e3da8471c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 4 Mar 2016 09:27:52 -0800 Subject: [PATCH] Add fallback dialer Previously we were unable to dial on a given transport if there were no listener addresses defined for that. This meant that most people couldnt use utp (as they had no utp listener transport to dial from). --- p2p/net/conn/dial.go | 5 +++ p2p/net/conn/interface.go | 2 ++ p2p/net/swarm/dial_test.go | 55 ++++++++++++++++++++++++++++++ p2p/net/swarm/swarm_test.go | 25 +++++++++++--- p2p/net/transport/fallback.go | 63 +++++++++++++++++++++++++++++++++++ 5 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 p2p/net/transport/fallback.go diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index 00b3566..3d0101d 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -22,6 +22,7 @@ func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer { LocalPeer: p, PrivateKey: pk, Wrapper: wrap, + fallback: new(transport.FallbackDialer), } } @@ -115,6 +116,10 @@ func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer { } } + if d.fallback.Matches(raddr) { + return d.fallback + } + return nil } diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 3da7e9a..babc2ff 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -65,6 +65,8 @@ type Dialer struct { // Wrapper to wrap the raw connection (optional) Wrapper WrapFunc + + fallback transport.Dialer } // Listener is an object that can accept connections. It matches net.Listener diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 66bac55..a6bc84e 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -18,6 +18,61 @@ import ( ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ) +func closeSwarms(swarms []*Swarm) { + for _, s := range swarms { + s.Close() + } +} + +func TestBasicDial(t *testing.T) { + t.Parallel() + ctx := context.Background() + + swarms := makeSwarms(ctx, t, 2) + defer closeSwarms(swarms) + s1 := swarms[0] + s2 := swarms[1] + + s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL) + + c, err := s1.Dial(ctx, s2.local) + if err != nil { + t.Fatal(err) + } + + s, err := c.NewStream() + if err != nil { + t.Fatal(err) + } + + s.Close() +} + +func TestDialWithNoListeners(t *testing.T) { + t.Parallel() + ctx := context.Background() + + s1 := makeDialOnlySwarm(ctx, t) + + swarms := makeSwarms(ctx, t, 1) + defer closeSwarms(swarms) + s2 := swarms[0] + + s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL) + + c, err := s1.Dial(ctx, s2.local) + if err != nil { + t.Fatal(err) + } + + s, err := c.NewStream() + if err != nil { + t.Fatal(err) + } + + s.Close() +} + func acceptAndHang(l net.Listener) { conns := make([]net.Conn, 0, 10) for { diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 121d1b0..fe2ef2a 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -24,31 +24,48 @@ func EchoStreamHandler(stream inet.Stream) { // pull out the ipfs conn c := stream.Conn() - log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) + log.Errorf("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) buf := make([]byte, 4) for { if _, err := stream.Read(buf); err != nil { if err != io.EOF { - log.Info("ping receive error:", err) + log.Error("ping receive error:", err) } return } if !bytes.Equal(buf, []byte("ping")) { - log.Infof("ping receive error: ping != %s %v", buf, buf) + log.Errorf("ping receive error: ping != %s %v", buf, buf) return } if _, err := stream.Write([]byte("pong")); err != nil { - log.Info("pond send error:", err) + log.Error("pond send error:", err) return } } }() } +func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm { + id := testutil.RandIdentityOrFatal(t) + + peerstore := peer.NewPeerstore() + peerstore.AddPubKey(id.ID(), id.PublicKey()) + peerstore.AddPrivKey(id.ID(), id.PrivateKey()) + + swarm, err := NewSwarm(ctx, nil, id.ID(), peerstore, metrics.NewBandwidthCounter()) + if err != nil { + t.Fatal(err) + } + + swarm.SetStreamHandler(EchoStreamHandler) + + return swarm +} + func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm { swarms := make([]*Swarm, 0, num) diff --git a/p2p/net/transport/fallback.go b/p2p/net/transport/fallback.go new file mode 100644 index 0000000..ca8afe2 --- /dev/null +++ b/p2p/net/transport/fallback.go @@ -0,0 +1,63 @@ +package transport + +import ( + "fmt" + + manet "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net" + mautp "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net/utp" + utp "gx/ipfs/QmVs3wq4cN64TFCxANzgSHjGPrjMnRnwPrxU8bqc7YP42s/utp" + mafmt "gx/ipfs/QmWLfU4tstw2aNcTykDm44xbSTCYJ9pUJwfhQCKGwckcHx/mafmt" + ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" +) + +type FallbackDialer struct { + madialer manet.Dialer +} + +func (fbd *FallbackDialer) Matches(a ma.Multiaddr) bool { + return mafmt.TCP.Matches(a) || mafmt.UTP.Matches(a) +} + +func (fbd *FallbackDialer) Dial(a ma.Multiaddr) (Conn, error) { + if mafmt.TCP.Matches(a) { + return fbd.tcpDial(a) + } + if mafmt.UTP.Matches(a) { + } + return nil, fmt.Errorf("cannot dial %s with fallback dialer", a) +} + +func (fbd *FallbackDialer) tcpDial(raddr ma.Multiaddr) (Conn, error) { + var c manet.Conn + var err error + c, err = fbd.madialer.Dial(raddr) + + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: c, + }, nil +} + +func (fbd *FallbackDialer) utpDial(raddr ma.Multiaddr) (Conn, error) { + _, addr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + con, err := utp.Dial(addr) + if err != nil { + return nil, err + } + + mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con}) + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: mnc, + }, nil +} -- GitLab