diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index 00b3566fd77d698983c8a3b27131114d123c2c1b..3d0101da9d903251c07e6911bca2184271f7ec6a 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -22,6 +22,7 @@ func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer { LocalPeer: p, PrivateKey: pk, Wrapper: wrap, + fallback: new(transport.FallbackDialer), } } @@ -115,6 +116,10 @@ func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer { } } + if d.fallback.Matches(raddr) { + return d.fallback + } + return nil } diff --git a/p2p/net/conn/interface.go b/p2p/net/conn/interface.go index 3da7e9aea908db4de07391937a36e2500f11f49a..babc2ff22151289bb9ba23dd8d821fe1fce91bc8 100644 --- a/p2p/net/conn/interface.go +++ b/p2p/net/conn/interface.go @@ -65,6 +65,8 @@ type Dialer struct { // Wrapper to wrap the raw connection (optional) Wrapper WrapFunc + + fallback transport.Dialer } // Listener is an object that can accept connections. It matches net.Listener diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 66bac550a812b96357a81fac5351e47e9d758f01..a6bc84e8615ee164e89b226c63598555ce43b962 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -18,6 +18,61 @@ import ( ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ) +func closeSwarms(swarms []*Swarm) { + for _, s := range swarms { + s.Close() + } +} + +func TestBasicDial(t *testing.T) { + t.Parallel() + ctx := context.Background() + + swarms := makeSwarms(ctx, t, 2) + defer closeSwarms(swarms) + s1 := swarms[0] + s2 := swarms[1] + + s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL) + + c, err := s1.Dial(ctx, s2.local) + if err != nil { + t.Fatal(err) + } + + s, err := c.NewStream() + if err != nil { + t.Fatal(err) + } + + s.Close() +} + +func TestDialWithNoListeners(t *testing.T) { + t.Parallel() + ctx := context.Background() + + s1 := makeDialOnlySwarm(ctx, t) + + swarms := makeSwarms(ctx, t, 1) + defer closeSwarms(swarms) + s2 := swarms[0] + + s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), peer.PermanentAddrTTL) + + c, err := s1.Dial(ctx, s2.local) + if err != nil { + t.Fatal(err) + } + + s, err := c.NewStream() + if err != nil { + t.Fatal(err) + } + + s.Close() +} + func acceptAndHang(l net.Listener) { conns := make([]net.Conn, 0, 10) for { diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 121d1b0f35069ca6bf1be9280cd53ca6fc544114..fe2ef2a207bf5743bb6122627ee68a8d1a458323 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -24,31 +24,48 @@ func EchoStreamHandler(stream inet.Stream) { // pull out the ipfs conn c := stream.Conn() - log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) + log.Errorf("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) buf := make([]byte, 4) for { if _, err := stream.Read(buf); err != nil { if err != io.EOF { - log.Info("ping receive error:", err) + log.Error("ping receive error:", err) } return } if !bytes.Equal(buf, []byte("ping")) { - log.Infof("ping receive error: ping != %s %v", buf, buf) + log.Errorf("ping receive error: ping != %s %v", buf, buf) return } if _, err := stream.Write([]byte("pong")); err != nil { - log.Info("pond send error:", err) + log.Error("pond send error:", err) return } } }() } +func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm { + id := testutil.RandIdentityOrFatal(t) + + peerstore := peer.NewPeerstore() + peerstore.AddPubKey(id.ID(), id.PublicKey()) + peerstore.AddPrivKey(id.ID(), id.PrivateKey()) + + swarm, err := NewSwarm(ctx, nil, id.ID(), peerstore, metrics.NewBandwidthCounter()) + if err != nil { + t.Fatal(err) + } + + swarm.SetStreamHandler(EchoStreamHandler) + + return swarm +} + func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm { swarms := make([]*Swarm, 0, num) diff --git a/p2p/net/transport/fallback.go b/p2p/net/transport/fallback.go new file mode 100644 index 0000000000000000000000000000000000000000..ca8afe2dfb40fcd33ed9f0e5c4557be2a1a08c58 --- /dev/null +++ b/p2p/net/transport/fallback.go @@ -0,0 +1,63 @@ +package transport + +import ( + "fmt" + + manet "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net" + mautp "gx/ipfs/QmQB7mNP3QE7b4zP2MQmsyJDqG5hzYE2CL8k1VyLWky2Ed/go-multiaddr-net/utp" + utp "gx/ipfs/QmVs3wq4cN64TFCxANzgSHjGPrjMnRnwPrxU8bqc7YP42s/utp" + mafmt "gx/ipfs/QmWLfU4tstw2aNcTykDm44xbSTCYJ9pUJwfhQCKGwckcHx/mafmt" + ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" +) + +type FallbackDialer struct { + madialer manet.Dialer +} + +func (fbd *FallbackDialer) Matches(a ma.Multiaddr) bool { + return mafmt.TCP.Matches(a) || mafmt.UTP.Matches(a) +} + +func (fbd *FallbackDialer) Dial(a ma.Multiaddr) (Conn, error) { + if mafmt.TCP.Matches(a) { + return fbd.tcpDial(a) + } + if mafmt.UTP.Matches(a) { + } + return nil, fmt.Errorf("cannot dial %s with fallback dialer", a) +} + +func (fbd *FallbackDialer) tcpDial(raddr ma.Multiaddr) (Conn, error) { + var c manet.Conn + var err error + c, err = fbd.madialer.Dial(raddr) + + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: c, + }, nil +} + +func (fbd *FallbackDialer) utpDial(raddr ma.Multiaddr) (Conn, error) { + _, addr, err := manet.DialArgs(raddr) + if err != nil { + return nil, err + } + + con, err := utp.Dial(addr) + if err != nil { + return nil, err + } + + mnc, err := manet.WrapNetConn(&mautp.Conn{Conn: con}) + if err != nil { + return nil, err + } + + return &connWrap{ + Conn: mnc, + }, nil +}