diff --git a/examples/justtcp/README.md b/examples/justtcp/README.md new file mode 100644 index 0000000000000000000000000000000000000000..9c6b486e90bdadffba41650a61dd1bacbdb316ed --- /dev/null +++ b/examples/justtcp/README.md @@ -0,0 +1,16 @@ +# libp2p 'just tcp' example + +## What this does +This example starts up a libp2p swarm that listens for tcp connections behind a +multistream muxer protocol of `/plaintext/1.0.0`. All connections made to it +will be echoed back. + +## Building +``` +$ go build +``` + +## Usage +``` +$ ./justtcp +``` diff --git a/examples/justtcp/main.go b/examples/justtcp/main.go new file mode 100644 index 0000000000000000000000000000000000000000..41609dc222d62df8cb0f5445302c77b3b58f6669 --- /dev/null +++ b/examples/justtcp/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "fmt" + "net" + "os" + + transport "github.com/ipfs/go-libp2p-transport" + ma "github.com/jbenet/go-multiaddr" + smux "github.com/jbenet/go-stream-muxer" + "github.com/libp2p/go-libp2p/p2p/net/swarm" +) + +func fatal(i interface{}) { + fmt.Println(i) + os.Exit(1) +} + +type NullMux struct{} + +type NullMuxConn struct { + net.Conn +} + +func (c *NullMuxConn) AcceptStream() (smux.Stream, error) { + panic("We don't do this") +} + +func (c *NullMuxConn) IsClosed() bool { + return false +} + +func (c *NullMuxConn) OpenStream() (smux.Stream, error) { + panic("if only you could see how disappointed i am in you right now") +} + +func (c *NullMuxConn) Serve(_ smux.StreamHandler) { +} + +func (nm NullMux) NewConn(c net.Conn, server bool) (smux.Conn, error) { + return &NullMuxConn{c}, nil +} + +var _ smux.Transport = (*NullMux)(nil) + +func main() { + laddr, err := ma.NewMultiaddr("/ip4/0.0.0.0/tcp/5555") + if err != nil { + fatal(err) + } + + swarm.PSTransport = new(NullMux) + + s := swarm.NewBlankSwarm(context.Background(), "bob", nil) + + s.AddTransport(transport.NewTCPTransport()) + + err = s.AddListenAddr(laddr) + if err != nil { + fatal(err) + } + + s.SetConnHandler(func(c *swarm.Conn) { + fmt.Println("CALLED OUR CONN HANDLER!") + defer c.Close() + buf := make([]byte, 1024) + for { + n, err := c.RawConn().Read(buf) + if err != nil { + fmt.Println(err) + return + } + fmt.Printf("read: %q\n", string(buf[:n])) + + _, err = c.RawConn().Write(buf[:n]) + if err != nil { + fmt.Println(err) + return + } + } + }) + + <-make(chan bool) +} diff --git a/p2p/net/conn/dial.go b/p2p/net/conn/dial.go index 6b29492bcf8a234d028dc2ee78c5a18c03d5af3f..2c55f31434b6051aaa53db34fde3ede97f051ba9 100644 --- a/p2p/net/conn/dial.go +++ b/p2p/net/conn/dial.go @@ -65,7 +65,7 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) ( } cryptoProtoChoice := SecioTag - if !EncryptConnections { + if !EncryptConnections || d.PrivateKey == nil { cryptoProtoChoice = NoEncryptionTag } diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 762113e83394c0fdcadc48f7e39f865f14d9daf5..2d73c6f24a584fde0d2dcbe4e17f8856f22f8579 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -259,8 +259,8 @@ func testDialerCloseEarly(t *testing.T, secure bool) { // lol nesting d2 := &Dialer{ - LocalPeer: p2.ID, - // PrivateKey: key2, -- dont give it key. we'll just close the conn. + LocalPeer: p2.ID, + PrivateKey: p2.PrivKey, //-- dont give it key. we'll just close the conn. } d2.AddDialer(dialer(t, p2.Addr)) @@ -527,7 +527,6 @@ func TestConcurrentAccept(t *testing.T) { err = grc.CheckForLeaks(goroFilter) if err != nil { - panic(err) t.Fatal(err) } } @@ -644,7 +643,6 @@ func TestConnectionTimeouts(t *testing.T) { err = grc.CheckForLeaks(goroFilter) if err != nil { - panic(err) t.Fatal(err) } } diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 1a7805b6dcd4649e60ea2ca1701d89f4a97c55d8..ffb883bb063ec51b993e0ace30c8ca9fccca37a2 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -211,7 +211,7 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee return false } - if EncryptConnections { + if EncryptConnections && sk != nil { l.mux.AddHandler(SecioTag, nil) } else { l.mux.AddHandler(NoEncryptionTag, nil) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index c342f411c40a640343acb17b1324cfcede357058..05402f5052ea1e44d13457715fc62414039f9bf6 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -10,6 +10,14 @@ import ( "sync" "time" + metrics "github.com/libp2p/go-libp2p/p2p/metrics" + mconn "github.com/libp2p/go-libp2p/p2p/metrics/conn" + inet "github.com/libp2p/go-libp2p/p2p/net" + conn "github.com/libp2p/go-libp2p/p2p/net/conn" + filter "github.com/libp2p/go-libp2p/p2p/net/filter" + addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr" + + ci "github.com/ipfs/go-libp2p-crypto" peer "github.com/ipfs/go-libp2p-peer" pstore "github.com/ipfs/go-libp2p-peerstore" transport "github.com/ipfs/go-libp2p-transport" @@ -19,12 +27,6 @@ import ( pst "github.com/jbenet/go-stream-muxer" "github.com/jbenet/goprocess" goprocessctx "github.com/jbenet/goprocess/context" - metrics "github.com/libp2p/go-libp2p/p2p/metrics" - mconn "github.com/libp2p/go-libp2p/p2p/metrics/conn" - inet "github.com/libp2p/go-libp2p/p2p/net" - conn "github.com/libp2p/go-libp2p/p2p/net/conn" - filter "github.com/libp2p/go-libp2p/p2p/net/filter" - addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr" psmss "github.com/whyrusleeping/go-smux-multistream" spdy "github.com/whyrusleeping/go-smux-spdystream" yamux "github.com/whyrusleeping/go-smux-yamux" @@ -143,6 +145,31 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, return s, nil } +func NewBlankSwarm(ctx context.Context, id peer.ID, privkey ci.PrivKey) *Swarm { + s := &Swarm{ + swarm: ps.NewSwarm(PSTransport), + local: id, + peers: pstore.NewPeerstore(), + ctx: ctx, + dialT: DialTimeout, + notifs: make(map[inet.Notifiee]ps.Notifiee), + fdRateLimit: make(chan struct{}, concurrentFdDials), + Filters: filter.NewFilters(), + dialer: conn.NewDialer(id, privkey, nil), + } + + // configure Swarm + s.limiter = newDialLimiter(s.dialAddr) + s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) + s.SetConnHandler(nil) // make sure to setup our own conn handler. + + return s +} + +func (s *Swarm) AddTransport(t transport.Transport) { + s.transports = append(s.transports, t) +} + func (s *Swarm) teardown() error { return s.swarm.Close() } diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 64fdb83bd094ff904d9938ad08b854dd8c372103..4eb61d5a415c5cbd036f3660f8469fc47b64476e 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -13,37 +13,42 @@ import ( context "golang.org/x/net/context" ) +func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { + tpt := s.transportForAddr(a) + if tpt == nil { + return fmt.Errorf("no transport for address: %s", a) + } + + d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts) + if err != nil { + return err + } + + s.dialer.AddDialer(d) + + list, err := tpt.Listen(a) + if err != nil { + return err + } + + err = s.addListener(list) + if err != nil { + return err + } + + return nil +} + // Open listeners and reuse-dialers for the given addresses func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error { errs := make([]error, len(addrs)) var succeeded int for i, a := range addrs { - tpt := s.transportForAddr(a) - if tpt == nil { - errs[i] = fmt.Errorf("no transport for address: %s", a) - continue - } - - d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts) - if err != nil { - errs[i] = err - continue - } - - s.dialer.AddDialer(d) - - list, err := tpt.Listen(a) - if err != nil { + if err := s.AddListenAddr(a); err != nil { errs[i] = err - continue + } else { + succeeded++ } - - err = s.addListener(list) - if err != nil { - errs[i] = err - continue - } - succeeded++ } for i, e := range errs { @@ -51,6 +56,7 @@ func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error { log.Warning("listen on %s failed: %s", addrs[i], errs[i]) } } + if succeeded == 0 && len(addrs) > 0 { return fmt.Errorf("failed to listen on any addresses: %s", errs) } @@ -83,7 +89,7 @@ func (s *Swarm) addListener(tptlist transport.Listener) error { list.SetAddrFilters(s.Filters) - if cw, ok := list.(conn.ListenerConnWrapper); ok { + if cw, ok := list.(conn.ListenerConnWrapper); ok && s.bwc != nil { cw.SetConnWrapper(func(c transport.Conn) transport.Conn { return mconn.WrapConn(s.bwc, c) })