Commit 3d31b833 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub
Browse files

Merge pull request #129 from libp2p/feat/extracting-2

extract conn, addr-util, and testutil
parents 0aaec876 0f3ffb2d
......@@ -8,16 +8,16 @@ import (
"log"
"strings"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
host "github.com/libp2p/go-libp2p/p2p/host"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
inet "github.com/libp2p/go-libp2p/p2p/net"
net "github.com/libp2p/go-libp2p/p2p/net"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"
testutil "github.com/libp2p/go-libp2p/testutil"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
ma "github.com/jbenet/go-multiaddr"
testutil "github.com/libp2p/go-testutil"
)
// create a 'Host' with a random peer to listen on the given address
......
package conn
import (
"context"
"fmt"
"io"
"net"
"time"
u "github.com/ipfs/go-ipfs-util"
ic "github.com/ipfs/go-libp2p-crypto"
lgbl "github.com/ipfs/go-libp2p-loggables"
peer "github.com/ipfs/go-libp2p-peer"
logging "github.com/ipfs/go-log"
mpool "github.com/jbenet/go-msgio/mpool"
ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
)
var log = logging.Logger("conn")
// ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size
func ReleaseBuffer(b []byte) {
log.Debugf("Releasing buffer! (cap,size = %d, %d)", cap(b), len(b))
mpool.ByteSlicePool.Put(uint32(cap(b)), b)
}
// singleConn represents a single connection to another Peer (IPFS Node).
type singleConn struct {
local peer.ID
remote peer.ID
maconn manet.Conn
event io.Closer
}
// newConn constructs a new connection
func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn) (Conn, error) {
ml := lgbl.Dial("conn", local, remote, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
conn := &singleConn{
local: local,
remote: remote,
maconn: maconn,
event: log.EventBegin(ctx, "connLifetime", ml),
}
log.Debugf("newSingleConn %p: %v to %v", conn, local, remote)
return conn, nil
}
// close is the internal close function, called by ContextCloser.Close
func (c *singleConn) Close() error {
defer func() {
if c.event != nil {
c.event.Close()
c.event = nil
}
}()
// close underlying connection
return c.maconn.Close()
}
// ID is an identifier unique to this connection.
func (c *singleConn) ID() string {
return ID(c)
}
func (c *singleConn) String() string {
return String(c, "singleConn")
}
func (c *singleConn) LocalAddr() net.Addr {
return c.maconn.LocalAddr()
}
func (c *singleConn) RemoteAddr() net.Addr {
return c.maconn.RemoteAddr()
}
func (c *singleConn) LocalPrivateKey() ic.PrivKey {
return nil
}
func (c *singleConn) RemotePublicKey() ic.PubKey {
return nil
}
func (c *singleConn) SetDeadline(t time.Time) error {
return c.maconn.SetDeadline(t)
}
func (c *singleConn) SetReadDeadline(t time.Time) error {
return c.maconn.SetReadDeadline(t)
}
func (c *singleConn) SetWriteDeadline(t time.Time) error {
return c.maconn.SetWriteDeadline(t)
}
// LocalMultiaddr is the Multiaddr on this side
func (c *singleConn) LocalMultiaddr() ma.Multiaddr {
return c.maconn.LocalMultiaddr()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *singleConn) RemoteMultiaddr() ma.Multiaddr {
return c.maconn.RemoteMultiaddr()
}
// LocalPeer is the Peer on this side
func (c *singleConn) LocalPeer() peer.ID {
return c.local
}
// RemotePeer is the Peer on the remote side
func (c *singleConn) RemotePeer() peer.ID {
return c.remote
}
// Read reads data, net.Conn style
func (c *singleConn) Read(buf []byte) (int, error) {
return c.maconn.Read(buf)
}
// Write writes data, net.Conn style
func (c *singleConn) Write(buf []byte) (int, error) {
return c.maconn.Write(buf)
}
// ID returns the ID of a given Conn.
func ID(c Conn) string {
l := fmt.Sprintf("%s/%s", c.LocalMultiaddr(), c.LocalPeer().Pretty())
r := fmt.Sprintf("%s/%s", c.RemoteMultiaddr(), c.RemotePeer().Pretty())
lh := u.Hash([]byte(l))
rh := u.Hash([]byte(r))
ch := u.XOR(lh, rh)
return peer.ID(ch).Pretty()
}
// String returns the user-friendly String representation of a conn
func String(c Conn, typ string) string {
return fmt.Sprintf("%s (%s) <-- %s %p --> (%s) %s",
c.LocalPeer(), c.LocalMultiaddr(), typ, c, c.RemoteMultiaddr(), c.RemotePeer())
}
package conn
import (
"bytes"
"fmt"
"runtime"
"sync"
"testing"
"time"
"context"
msgio "github.com/jbenet/go-msgio"
travis "github.com/libp2p/go-libp2p/testutil/ci/travis"
)
func msgioWrap(c Conn) msgio.ReadWriter {
return msgio.NewReadWriter(c)
}
func testOneSendRecv(t *testing.T, c1, c2 Conn) {
mc1 := msgioWrap(c1)
mc2 := msgioWrap(c2)
log.Debugf("testOneSendRecv from %s to %s", c1.LocalPeer(), c2.LocalPeer())
m1 := []byte("hello")
if err := mc1.WriteMsg(m1); err != nil {
t.Fatal(err)
}
m2, err := mc2.ReadMsg()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(m1, m2) {
t.Fatal("failed to send: %s %s", m1, m2)
}
}
func testNotOneSendRecv(t *testing.T, c1, c2 Conn) {
mc1 := msgioWrap(c1)
mc2 := msgioWrap(c2)
m1 := []byte("hello")
if err := mc1.WriteMsg(m1); err == nil {
t.Fatal("write should have failed", err)
}
_, err := mc2.ReadMsg()
if err == nil {
t.Fatal("read should have failed", err)
}
}
func TestClose(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c1, c2, _, _ := setupSingleConn(t, ctx)
testOneSendRecv(t, c1, c2)
testOneSendRecv(t, c2, c1)
c1.Close()
testNotOneSendRecv(t, c1, c2)
c2.Close()
testNotOneSendRecv(t, c2, c1)
testNotOneSendRecv(t, c1, c2)
}
func TestCloseLeak(t *testing.T) {
// t.Skip("Skipping in favor of another test")
if testing.Short() {
t.SkipNow()
}
if travis.IsRunning() {
t.Skip("this doesn't work well on travis")
}
var wg sync.WaitGroup
runPair := func(num int) {
ctx, cancel := context.WithCancel(context.Background())
c1, c2, _, _ := setupSingleConn(t, ctx)
mc1 := msgioWrap(c1)
mc2 := msgioWrap(c2)
for i := 0; i < num; i++ {
b1 := []byte(fmt.Sprintf("beep%d", i))
mc1.WriteMsg(b1)
b2, err := mc2.ReadMsg()
if err != nil {
panic(err)
}
if !bytes.Equal(b1, b2) {
panic(fmt.Errorf("bytes not equal: %s != %s", b1, b2))
}
b2 = []byte(fmt.Sprintf("boop%d", i))
mc2.WriteMsg(b2)
b1, err = mc1.ReadMsg()
if err != nil {
panic(err)
}
if !bytes.Equal(b1, b2) {
panic(fmt.Errorf("bytes not equal: %s != %s", b1, b2))
}
<-time.After(time.Microsecond * 5)
}
c1.Close()
c2.Close()
cancel() // close the listener
wg.Done()
}
var cons = 5
var msgs = 50
log.Debugf("Running %d connections * %d msgs.\n", cons, msgs)
for i := 0; i < cons; i++ {
wg.Add(1)
go runPair(msgs)
}
log.Debugf("Waiting...\n")
wg.Wait()
// done!
time.Sleep(time.Millisecond * 150)
ngr := runtime.NumGoroutine()
if ngr > 25 {
// note, this is really innacurate
//panic("uncomment me to debug")
t.Fatal("leaking goroutines:", ngr)
}
}
package conn
import (
"fmt"
"math/rand"
"strings"
"time"
"context"
ci "github.com/ipfs/go-libp2p-crypto"
lgbl "github.com/ipfs/go-libp2p-loggables"
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
transport "github.com/libp2p/go-libp2p-transport"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
msmux "github.com/whyrusleeping/go-multistream"
)
type WrapFunc func(transport.Conn) transport.Conn
func NewDialer(p peer.ID, pk ci.PrivKey, wrap WrapFunc) *Dialer {
return &Dialer{
LocalPeer: p,
PrivateKey: pk,
Wrapper: wrap,
fallback: new(transport.FallbackDialer),
}
}
// String returns the string rep of d.
func (d *Dialer) String() string {
return fmt.Sprintf("<Dialer %s ...>", d.LocalPeer)
}
// Dial connects to a peer over a particular address
// Ensures raddr is part of peer.Addresses()
// Example: d.DialAddr(ctx, peer.Addresses()[0], peer)
func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (Conn, error) {
logdial := lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr)
logdial["encrypted"] = (d.PrivateKey != nil) // log wether this will be an encrypted dial or not.
defer log.EventBegin(ctx, "connDial", logdial).Done()
var connOut Conn
var errOut error
done := make(chan struct{})
// do it async to ensure we respect don contexteone
go func() {
defer func() {
select {
case done <- struct{}{}:
case <-ctx.Done():
}
}()
maconn, err := d.rawConnDial(ctx, raddr, remote)
if err != nil {
errOut = err
return
}
if d.Wrapper != nil {
maconn = d.Wrapper(maconn)
}
cryptoProtoChoice := SecioTag
if !EncryptConnections || d.PrivateKey == nil {
cryptoProtoChoice = NoEncryptionTag
}
maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout))
err = msmux.SelectProtoOrFail(cryptoProtoChoice, maconn)
if err != nil {
errOut = err
return
}
maconn.SetReadDeadline(time.Time{})
c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
if err != nil {
maconn.Close()
errOut = err
return
}
if d.PrivateKey == nil || !EncryptConnections {
log.Warning("dialer %s dialing INSECURELY %s at %s!", d, remote, raddr)
connOut = c
return
}
c2, err := newSecureConn(ctx, d.PrivateKey, c)
if err != nil {
errOut = err
c.Close()
return
}
connOut = c2
}()
select {
case <-ctx.Done():
logdial["error"] = ctx.Err()
logdial["dial"] = "failure"
return nil, ctx.Err()
case <-done:
// whew, finished.
}
if errOut != nil {
logdial["error"] = errOut
logdial["dial"] = "failure"
return nil, errOut
}
logdial["dial"] = "success"
return connOut, nil
}
func (d *Dialer) AddDialer(pd transport.Dialer) {
d.Dialers = append(d.Dialers, pd)
}
// returns dialer that can dial the given address
func (d *Dialer) subDialerForAddr(raddr ma.Multiaddr) transport.Dialer {
for _, pd := range d.Dialers {
if pd.Matches(raddr) {
return pd
}
}
if d.fallback.Matches(raddr) {
return d.fallback
}
return nil
}
// rawConnDial dials the underlying net.Conn + manet.Conns
func (d *Dialer) rawConnDial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (transport.Conn, error) {
if strings.HasPrefix(raddr.String(), "/ip4/0.0.0.0") {
log.Event(ctx, "connDialZeroAddr", lgbl.Dial("conn", d.LocalPeer, remote, nil, raddr))
return nil, fmt.Errorf("Attempted to connect to zero address: %s", raddr)
}
sd := d.subDialerForAddr(raddr)
if sd == nil {
return nil, fmt.Errorf("no dialer for %s", raddr)
}
return sd.DialContext(ctx, raddr)
}
func pickLocalAddr(laddrs []ma.Multiaddr, raddr ma.Multiaddr) (laddr ma.Multiaddr) {
if len(laddrs) < 1 {
return nil
}
// make sure that we ONLY use local addrs that match the remote addr.
laddrs = manet.AddrMatch(raddr, laddrs)
if len(laddrs) < 1 {
return nil
}
// make sure that we ONLY use local addrs that CAN dial the remote addr.
// filter out all the local addrs that aren't capable
raddrIPLayer := ma.Split(raddr)[0]
raddrIsLoopback := manet.IsIPLoopback(raddrIPLayer)
raddrIsLinkLocal := manet.IsIP6LinkLocal(raddrIPLayer)
laddrs = addrutil.FilterAddrs(laddrs, func(a ma.Multiaddr) bool {
laddrIPLayer := ma.Split(a)[0]
laddrIsLoopback := manet.IsIPLoopback(laddrIPLayer)
laddrIsLinkLocal := manet.IsIP6LinkLocal(laddrIPLayer)
if laddrIsLoopback { // our loopback addrs can only dial loopbacks.
return raddrIsLoopback
}
if laddrIsLinkLocal {
return raddrIsLinkLocal // out linklocal addrs can only dial link locals.
}
return true
})
// TODO pick with a good heuristic
// we use a random one for now to prevent bad addresses from making nodes unreachable
// with a random selection, multiple tries may work.
return laddrs[rand.Intn(len(laddrs))]
}
// MultiaddrProtocolsMatch returns whether two multiaddrs match in protocol stacks.
func MultiaddrProtocolsMatch(a, b ma.Multiaddr) bool {
ap := a.Protocols()
bp := b.Protocols()
if len(ap) != len(bp) {
return false
}
for i, api := range ap {
if api.Code != bp[i].Code {
return false
}
}
return true
}
// MultiaddrNetMatch returns the first Multiaddr found to match network.
func MultiaddrNetMatch(tgt ma.Multiaddr, srcs []ma.Multiaddr) ma.Multiaddr {
for _, a := range srcs {
if MultiaddrProtocolsMatch(tgt, a) {
return a
}
}
return nil
}
package conn
import (
"bytes"
"fmt"
"io"
"net"
"runtime"
"strings"
"sync"
"testing"
"time"
ic "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
transport "github.com/libp2p/go-libp2p-transport"
tu "github.com/libp2p/go-libp2p/testutil"
tcpt "github.com/libp2p/go-tcp-transport"
"context"
ma "github.com/jbenet/go-multiaddr"
msmux "github.com/whyrusleeping/go-multistream"
grc "github.com/whyrusleeping/gorocheck"
)
func goroFilter(r *grc.Goroutine) bool {
return strings.Contains(r.Function, "go-log.") || strings.Contains(r.Stack[0], "testing.(*T).Run")
}
func echoListen(ctx context.Context, listener Listener) {
for {
c, err := listener.Accept()
if err != nil {
select {
case <-ctx.Done():
return
default:
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
<-time.After(time.Microsecond * 10)
continue
}
log.Debugf("echoListen: listener appears to be closing")
return
}
go echo(c.(Conn))
}
}
func echo(c Conn) {
io.Copy(c, c)
}
func setupSecureConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.PeerNetParams) {
return setupConn(t, ctx, true)
}
func setupSingleConn(t *testing.T, ctx context.Context) (a, b Conn, p1, p2 tu.PeerNetParams) {
return setupConn(t, ctx, false)
}
func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey) (Listener, error) {
list, err := tcpt.NewTCPTransport().Listen(addr)
if err != nil {
return nil, err
}
return WrapTransportListener(ctx, list, local, sk)
}
func dialer(t *testing.T, a ma.Multiaddr) transport.Dialer {
tpt := tcpt.NewTCPTransport()
tptd, err := tpt.Dialer(a)
if err != nil {
t.Fatal(err)
}
return tptd
}
func setupConn(t *testing.T, ctx context.Context, secure bool) (a, b Conn, p1, p2 tu.PeerNetParams) {
p1 = tu.RandPeerNetParamsOrFatal(t)
p2 = tu.RandPeerNetParamsOrFatal(t)
key1 := p1.PrivKey
key2 := p2.PrivKey
if !secure {
key1 = nil
key2 = nil
}
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
d2 := &Dialer{
LocalPeer: p2.ID,
PrivateKey: key2,
}
d2.AddDialer(dialer(t, p2.Addr))
var c2 Conn
done := make(chan error)
go func() {
defer close(done)
var err error
c2, err = d2.Dial(ctx, p1.Addr, p1.ID)
if err != nil {
done <- err
return
}
// if secure, need to read + write, as that's what triggers the handshake.
if secure {
if err := sayHello(c2); err != nil {
done <- err
}
}
}()
c1, err := l1.Accept()
if err != nil {
t.Fatal("failed to accept", err)
}
// if secure, need to read + write, as that's what triggers the handshake.
if secure {
if err := sayHello(c1); err != nil {
done <- err
}
}
if err := <-done; err != nil {
t.Fatal(err)
}
return c1.(Conn), c2, p1, p2
}
func sayHello(c net.Conn) error {
h := []byte("hello")
if _, err := c.Write(h); err != nil {
return err
}
if _, err := c.Read(h); err != nil {
return err
}
if string(h) != "hello" {
return fmt.Errorf("did not get hello")
}
return nil
}
func testDialer(t *testing.T, secure bool) {
// t.Skip("Skipping in favor of another test")
p1 := tu.RandPeerNetParamsOrFatal(t)
p2 := tu.RandPeerNetParamsOrFatal(t)
key1 := p1.PrivKey
key2 := p2.PrivKey
if !secure {
key1 = nil
key2 = nil
t.Log("testing insecurely")
} else {
t.Log("testing securely")
}
ctx, cancel := context.WithCancel(context.Background())
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
d2 := &Dialer{
LocalPeer: p2.ID,
PrivateKey: key2,
}
d2.AddDialer(dialer(t, p2.Addr))
go echoListen(ctx, l1)
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
if err != nil {
t.Fatal("error dialing peer", err)
}
// fmt.Println("sending")
mc := msgioWrap(c)
mc.WriteMsg([]byte("beep"))
mc.WriteMsg([]byte("boop"))
out, err := mc.ReadMsg()
if err != nil {
t.Fatal(err)
}
// fmt.Println("recving", string(out))
data := string(out)
if data != "beep" {
t.Error("unexpected conn output", data)
}
out, err = mc.ReadMsg()
if err != nil {
t.Fatal(err)
}
data = string(out)
if string(out) != "boop" {
t.Error("unexpected conn output", data)
}
// fmt.Println("closing")
c.Close()
l1.Close()
cancel()
}
func TestDialerInsecure(t *testing.T) {
// t.Skip("Skipping in favor of another test")
testDialer(t, false)
}
func TestDialerSecure(t *testing.T) {
// t.Skip("Skipping in favor of another test")
testDialer(t, true)
}
func testDialerCloseEarly(t *testing.T, secure bool) {
// t.Skip("Skipping in favor of another test")
p1 := tu.RandPeerNetParamsOrFatal(t)
p2 := tu.RandPeerNetParamsOrFatal(t)
key1 := p1.PrivKey
if !secure {
key1 = nil
t.Log("testing insecurely")
} else {
t.Log("testing securely")
}
ctx, cancel := context.WithCancel(context.Background())
l1, err := Listen(ctx, p1.Addr, p1.ID, key1)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
// lol nesting
d2 := &Dialer{
LocalPeer: p2.ID,
PrivateKey: p2.PrivKey, //-- dont give it key. we'll just close the conn.
}
d2.AddDialer(dialer(t, p2.Addr))
errs := make(chan error, 100)
done := make(chan struct{}, 1)
gotclosed := make(chan struct{}, 1)
go func() {
defer func() { done <- struct{}{} }()
c, err := l1.Accept()
if err != nil {
if strings.Contains(err.Error(), "closed") {
gotclosed <- struct{}{}
return
}
errs <- err
}
if _, err := c.Write([]byte("hello")); err != nil {
gotclosed <- struct{}{}
return
}
errs <- fmt.Errorf("wrote to conn")
}()
c, err := d2.Dial(ctx, p1.Addr, p1.ID)
if err != nil {
t.Fatal(err)
}
c.Close() // close it early.
readerrs := func() {
for {
select {
case e := <-errs:
t.Error(e)
default:
return
}
}
}
readerrs()
l1.Close()
<-done
cancel()
readerrs()
close(errs)
select {
case <-gotclosed:
default:
t.Error("did not get closed")
}
}
// we dont do a handshake with singleConn, so cant "close early."
// func TestDialerCloseEarlyInsecure(t *testing.T) {
// // t.Skip("Skipping in favor of another test")
// testDialerCloseEarly(t, false)
// }
func TestDialerCloseEarlySecure(t *testing.T) {
// t.Skip("Skipping in favor of another test")
testDialerCloseEarly(t, true)
}
func TestMultistreamHeader(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
go func() {
_, _ = l1.Accept()
}()
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Fatal(err)
}
defer con.Close()
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Fatal(err)
}
}
func TestFailedAccept(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
done := make(chan struct{})
go func() {
defer close(done)
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("first dial failed: ", err)
}
// write some garbage
con.Write(bytes.Repeat([]byte{255}, 1000))
con.Close()
con, err = net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("second dial failed: ", err)
}
defer con.Close()
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Error("msmux select failed: ", err)
}
}()
c, err := l1.Accept()
if err != nil {
t.Fatal("connections after a failed accept should still work: ", err)
}
c.Close()
<-done
}
func TestHangingAccept(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
done := make(chan struct{})
go func() {
defer close(done)
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("first dial failed: ", err)
}
// hang this connection
defer con.Close()
// ensure that the first conn hits first
time.Sleep(time.Millisecond * 50)
con2, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("second dial failed: ", err)
}
defer con2.Close()
err = msmux.SelectProtoOrFail(SecioTag, con2)
if err != nil {
t.Error("msmux select failed: ", err)
}
_, err = con2.Write([]byte("test"))
if err != nil {
t.Error("con write failed: ", err)
}
}()
c, err := l1.Accept()
if err != nil {
t.Fatal("connections after a failed accept should still work: ", err)
}
c.Close()
<-done
}
// This test kicks off N (=300) concurrent dials, which wait d (=20ms) seconds before failing.
// That wait holds up the handshake (multistream AND crypto), which will happen BEFORE
// l1.Accept() returns a connection. This test checks that the handshakes all happen
// concurrently in the listener side, and not sequentially. This ensures that a hanging dial
// will not block the listener from accepting other dials concurrently.
func TestConcurrentAccept(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
n := 300
delay := time.Millisecond * 20
if runtime.GOOS == "darwin" {
n = 100
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
// hang this connection
defer con.Close()
time.Sleep(delay)
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Error(err)
}
}()
}
before := time.Now()
for i := 0; i < n; i++ {
c, err := l1.Accept()
if err != nil {
t.Fatal("connections after a failed accept should still work: ", err)
}
c.Close()
}
limit := delay * time.Duration(n)
took := time.Since(before)
if took > limit {
t.Fatal("took too long!")
}
log.Errorf("took: %s (less than %s)", took, limit)
l1.Close()
wg.Wait()
cancel()
time.Sleep(time.Millisecond * 100)
err = grc.CheckForLeaks(goroFilter)
if err != nil {
t.Fatal(err)
}
}
func TestConnectionTimeouts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
old := NegotiateReadTimeout
NegotiateReadTimeout = time.Second * 5
defer func() { NegotiateReadTimeout = old }()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
n := 100
if runtime.GOOS == "darwin" {
n = 50
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
defer con.Close()
// hang this connection until timeout
io.ReadFull(con, make([]byte, 1000))
}()
}
// wait to make sure the hanging dials have started
time.Sleep(time.Millisecond * 50)
good_n := 20
for i := 0; i < good_n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
defer con.Close()
// dial these ones through
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Error(err)
}
}()
}
before := time.Now()
for i := 0; i < good_n; i++ {
c, err := l1.Accept()
if err != nil {
t.Fatal("connections during hung dials should still work: ", err)
}
c.Close()
}
took := time.Since(before)
if took > time.Second*5 {
t.Fatal("hanging dials shouldnt block good dials")
}
wg.Wait()
go func() {
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
defer con.Close()
// dial these ones through
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Error(err)
}
}()
// make sure we can dial in still after a bunch of timeouts
con, err := l1.Accept()
if err != nil {
t.Fatal(err)
}
con.Close()
l1.Close()
cancel()
time.Sleep(time.Millisecond * 100)
err = grc.CheckForLeaks(goroFilter)
if err != nil {
t.Fatal(err)
}
}
package conn
import (
"io"
"net"
"time"
ic "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
)
type PeerConn interface {
io.Closer
// LocalPeer (this side) ID, PrivateKey, and Address
LocalPeer() peer.ID
LocalPrivateKey() ic.PrivKey
LocalMultiaddr() ma.Multiaddr
// RemotePeer ID, PublicKey, and Address
RemotePeer() peer.ID
RemotePublicKey() ic.PubKey
RemoteMultiaddr() ma.Multiaddr
}
// Conn is a generic message-based Peer-to-Peer connection.
type Conn interface {
PeerConn
// ID is an identifier unique to this connection.
ID() string
// can't just say "net.Conn" cause we have duplicate methods.
LocalAddr() net.Addr
RemoteAddr() net.Addr
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
io.Reader
io.Writer
}
// Dialer is an object that can open connections. We could have a "convenience"
// Dial function as before, but it would have many arguments, as dialing is
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
type Dialer struct {
// LocalPeer is the identity of the local Peer.
LocalPeer peer.ID
// LocalAddrs is a set of local addresses to use.
//LocalAddrs []ma.Multiaddr
// Dialers are the sub-dialers usable by this dialer
// selected in order based on the address being dialed
Dialers []transport.Dialer
// PrivateKey used to initialize a secure connection.
// Warning: if PrivateKey is nil, connection will not be secured.
PrivateKey ic.PrivKey
// Wrapper to wrap the raw connection (optional)
Wrapper WrapFunc
fallback transport.Dialer
}
// Listener is an object that can accept connections. It matches net.Listener
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (net.Conn, error)
// Addr is the local address
Addr() net.Addr
// Multiaddr is the local multiaddr address
Multiaddr() ma.Multiaddr
// LocalPeer is the identity of the local Peer.
LocalPeer() peer.ID
SetAddrFilters(*filter.Filters)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
}
// EncryptConnections is a global parameter because it should either be
// enabled or _completely disabled_. I.e. a node should only be able to talk
// to proper (encrypted) networks if it is encrypting all its transports.
// Running a node with disabled transport encryption is useful to debug the
// protocols, achieve implementation interop, or for private networks which
// -- for whatever reason -- _must_ run unencrypted.
var EncryptConnections = true
package conn
import (
"context"
"fmt"
"io"
"net"
"sync"
"time"
ic "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
tec "github.com/jbenet/go-temp-err-catcher"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
msmux "github.com/whyrusleeping/go-multistream"
)
const (
SecioTag = "/secio/1.0.0"
NoEncryptionTag = "/plaintext/1.0.0"
)
var (
connAcceptBuffer = 32
NegotiateReadTimeout = time.Second * 60
)
// ConnWrapper is any function that wraps a raw multiaddr connection
type ConnWrapper func(transport.Conn) transport.Conn
// listener is an object that can accept connections. It implements Listener
type listener struct {
transport.Listener
local peer.ID // LocalPeer is the identity of the local Peer
privk ic.PrivKey // private key to use to initialize secure conns
filters *filter.Filters
wrapper ConnWrapper
catcher tec.TempErrCatcher
proc goprocess.Process
mux *msmux.MultistreamMuxer
incoming chan connErr
ctx context.Context
}
func (l *listener) teardown() error {
defer log.Debugf("listener closed: %s %s", l.local, l.Multiaddr())
return l.Listener.Close()
}
func (l *listener) Close() error {
log.Debugf("listener closing: %s %s", l.local, l.Multiaddr())
return l.proc.Close()
}
func (l *listener) String() string {
return fmt.Sprintf("<Listener %s %s>", l.local, l.Multiaddr())
}
func (l *listener) SetAddrFilters(fs *filter.Filters) {
l.filters = fs
}
type connErr struct {
conn transport.Conn
err error
}
// Accept waits for and returns the next connection to the listener.
// Note that unfortunately this
func (l *listener) Accept() (net.Conn, error) {
for con := range l.incoming {
if con.err != nil {
return nil, con.err
}
c, err := newSingleConn(l.ctx, l.local, "", con.conn)
if err != nil {
con.conn.Close()
if l.catcher.IsTemporary(err) {
continue
}
return nil, err
}
if l.privk == nil || !EncryptConnections {
log.Warning("listener %s listening INSECURELY!", l)
return c, nil
}
sc, err := newSecureConn(l.ctx, l.privk, c)
if err != nil {
con.conn.Close()
log.Infof("ignoring conn we failed to secure: %s %s", err, c)
continue
}
return sc, nil
}
return nil, fmt.Errorf("listener is closed")
}
func (l *listener) Addr() net.Addr {
return l.Listener.Addr()
}
// Multiaddr is the identity of the local Peer.
// If there is an error converting from net.Addr to ma.Multiaddr,
// the return value will be nil.
func (l *listener) Multiaddr() ma.Multiaddr {
return l.Listener.Multiaddr()
}
// LocalPeer is the identity of the local Peer.
func (l *listener) LocalPeer() peer.ID {
return l.local
}
func (l *listener) Loggable() map[string]interface{} {
return map[string]interface{}{
"listener": map[string]interface{}{
"peer": l.LocalPeer(),
"address": l.Multiaddr(),
"secure": (l.privk != nil),
},
}
}
func (l *listener) handleIncoming() {
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(l.incoming)
}()
wg.Add(1)
defer wg.Done()
for {
maconn, err := l.Listener.Accept()
if err != nil {
if l.catcher.IsTemporary(err) {
continue
}
l.incoming <- connErr{err: err}
return
}
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}
// If we have a wrapper func, wrap this conn
if l.wrapper != nil {
maconn = l.wrapper(maconn)
}
wg.Add(1)
go func() {
defer wg.Done()
maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout))
_, _, err = l.mux.Negotiate(maconn)
if err != nil {
log.Info("incoming conn: negotiation of crypto protocol failed: ", err)
maconn.Close()
return
}
// clear read readline
maconn.SetReadDeadline(time.Time{})
l.incoming <- connErr{conn: maconn}
}()
}
}
func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
l := &listener{
Listener: ml,
local: local,
privk: sk,
mux: msmux.NewMultistreamMuxer(),
incoming: make(chan connErr, connAcceptBuffer),
ctx: ctx,
}
l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown)
l.catcher.IsTemp = func(e error) bool {
// ignore connection breakages up to this point. but log them
if e == io.EOF {
log.Debugf("listener ignoring conn with EOF: %s", e)
return true
}
te, ok := e.(tec.Temporary)
if ok {
log.Debugf("listener ignoring conn with temporary err: %s", e)
return te.Temporary()
}
return false
}
if EncryptConnections && sk != nil {
l.mux.AddHandler(SecioTag, nil)
} else {
l.mux.AddHandler(NoEncryptionTag, nil)
}
go l.handleIncoming()
log.Debugf("Conn Listener on %s", l.Multiaddr())
log.Event(ctx, "swarmListen", l)
return l, nil
}
type ListenerConnWrapper interface {
SetConnWrapper(ConnWrapper)
}
// SetConnWrapper assigns a maconn ConnWrapper to wrap all incoming
// connections with. MUST be set _before_ calling `Accept()`
func (l *listener) SetConnWrapper(cw ConnWrapper) {
l.wrapper = cw
}
package conn
import (
"context"
"errors"
"net"
"time"
ic "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
secio "github.com/ipfs/go-libp2p-secio"
ma "github.com/jbenet/go-multiaddr"
)
// secureConn wraps another Conn object with an encrypted channel.
type secureConn struct {
insecure Conn // the wrapped conn
secure secio.Session // secure Session
}
// newConn constructs a new connection
func newSecureConn(ctx context.Context, sk ic.PrivKey, insecure Conn) (Conn, error) {
if insecure == nil {
return nil, errors.New("insecure is nil")
}
if insecure.LocalPeer() == "" {
return nil, errors.New("insecure.LocalPeer() is nil")
}
if sk == nil {
return nil, errors.New("private key is nil")
}
// NewSession performs the secure handshake, which takes multiple RTT
sessgen := secio.SessionGenerator{LocalID: insecure.LocalPeer(), PrivateKey: sk}
secure, err := sessgen.NewSession(ctx, insecure)
if err != nil {
return nil, err
}
conn := &secureConn{
insecure: insecure,
secure: secure,
}
return conn, nil
}
func (c *secureConn) Close() error {
return c.secure.Close()
}
// ID is an identifier unique to this connection.
func (c *secureConn) ID() string {
return ID(c)
}
func (c *secureConn) String() string {
return String(c, "secureConn")
}
func (c *secureConn) LocalAddr() net.Addr {
return c.insecure.LocalAddr()
}
func (c *secureConn) RemoteAddr() net.Addr {
return c.insecure.RemoteAddr()
}
func (c *secureConn) SetDeadline(t time.Time) error {
return c.insecure.SetDeadline(t)
}
func (c *secureConn) SetReadDeadline(t time.Time) error {
return c.insecure.SetReadDeadline(t)
}
func (c *secureConn) SetWriteDeadline(t time.Time) error {
return c.insecure.SetWriteDeadline(t)
}
// LocalMultiaddr is the Multiaddr on this side
func (c *secureConn) LocalMultiaddr() ma.Multiaddr {
return c.insecure.LocalMultiaddr()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *secureConn) RemoteMultiaddr() ma.Multiaddr {
return c.insecure.RemoteMultiaddr()
}
// LocalPeer is the Peer on this side
func (c *secureConn) LocalPeer() peer.ID {
return c.secure.LocalPeer()
}
// RemotePeer is the Peer on the remote side
func (c *secureConn) RemotePeer() peer.ID {
return c.secure.RemotePeer()
}
// LocalPrivateKey is the public key of the peer on this side
func (c *secureConn) LocalPrivateKey() ic.PrivKey {
return c.secure.LocalPrivateKey()
}
// RemotePubKey is the public key of the peer on the remote side
func (c *secureConn) RemotePublicKey() ic.PubKey {
return c.secure.RemotePublicKey()
}
// Read reads data, net.Conn style
func (c *secureConn) Read(buf []byte) (int, error) {
return c.secure.ReadWriter().Read(buf)
}
// Write writes data, net.Conn style
func (c *secureConn) Write(buf []byte) (int, error) {
return c.secure.ReadWriter().Write(buf)
}
// ReleaseMsg releases a buffer
func (c *secureConn) ReleaseMsg(m []byte) {
c.secure.ReadWriter().ReleaseMsg(m)
}
package conn
import (
"bytes"
"context"
"runtime"
"sync"
"testing"
"time"
ic "github.com/ipfs/go-libp2p-crypto"
travis "github.com/libp2p/go-libp2p/testutil/ci/travis"
)
func upgradeToSecureConn(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn) (Conn, error) {
if c, ok := c.(*secureConn); ok {
return c, nil
}
// shouldn't happen, because dial + listen already return secure conns.
s, err := newSecureConn(ctx, sk, c)
if err != nil {
return nil, err
}
// need to read + write, as that's what triggers the handshake.
h := []byte("hello")
if _, err := s.Write(h); err != nil {
return nil, err
}
if _, err := s.Read(h); err != nil {
return nil, err
}
return s, nil
}
func secureHandshake(t *testing.T, ctx context.Context, sk ic.PrivKey, c Conn, done chan error) {
_, err := upgradeToSecureConn(t, ctx, sk, c)
done <- err
}
func TestSecureSimple(t *testing.T) {
// t.Skip("Skipping in favor of another test")
numMsgs := 100
if testing.Short() {
numMsgs = 10
}
ctx := context.Background()
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, p1.PrivKey, c1, done)
go secureHandshake(t, ctx, p2.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err != nil {
t.Fatal(err)
}
}
for i := 0; i < numMsgs; i++ {
testOneSendRecv(t, c1, c2)
testOneSendRecv(t, c2, c1)
}
c1.Close()
c2.Close()
}
func TestSecureClose(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx := context.Background()
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, p1.PrivKey, c1, done)
go secureHandshake(t, ctx, p2.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err != nil {
t.Fatal(err)
}
}
testOneSendRecv(t, c1, c2)
c1.Close()
testNotOneSendRecv(t, c1, c2)
c2.Close()
testNotOneSendRecv(t, c1, c2)
testNotOneSendRecv(t, c2, c1)
}
func TestSecureCancelHandshake(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, p1.PrivKey, c1, done)
time.Sleep(time.Millisecond)
cancel() // cancel ctx
go secureHandshake(t, ctx, p2.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err == nil {
t.Error("cancel should've errored out")
}
}
}
func TestSecureHandshakeFailsWithWrongKeys(t *testing.T) {
// t.Skip("Skipping in favor of another test")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c1, c2, p1, p2 := setupSingleConn(t, ctx)
done := make(chan error)
go secureHandshake(t, ctx, p2.PrivKey, c1, done)
go secureHandshake(t, ctx, p1.PrivKey, c2, done)
for i := 0; i < 2; i++ {
if err := <-done; err == nil {
t.Fatal("wrong keys should've errored out.")
}
}
}
func TestSecureCloseLeak(t *testing.T) {
// t.Skip("Skipping in favor of another test")
if testing.Short() {
t.SkipNow()
}
if travis.IsRunning() {
t.Skip("this doesn't work well on travis")
}
runPair := func(c1, c2 Conn, num int) {
mc1 := msgioWrap(c1)
mc2 := msgioWrap(c2)
log.Debugf("runPair %d", num)
for i := 0; i < num; i++ {
log.Debugf("runPair iteration %d", i)
b1 := []byte("beep")
mc1.WriteMsg(b1)
b2, err := mc2.ReadMsg()
if err != nil {
panic(err)
}
if !bytes.Equal(b1, b2) {
panic("bytes not equal")
}
b2 = []byte("beep")
mc2.WriteMsg(b2)
b1, err = mc1.ReadMsg()
if err != nil {
panic(err)
}
if !bytes.Equal(b1, b2) {
panic("bytes not equal")
}
time.Sleep(time.Microsecond * 5)
}
}
var cons = 5
var msgs = 50
log.Debugf("Running %d connections * %d msgs.\n", cons, msgs)
var wg sync.WaitGroup
for i := 0; i < cons; i++ {
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
c1, c2, _, _ := setupSecureConn(t, ctx)
go func(c1, c2 Conn) {
defer func() {
c1.Close()
c2.Close()
cancel()
wg.Done()
}()
runPair(c1, c2, msgs)
}(c1, c2)
}
log.Debugf("Waiting...")
wg.Wait()
// done!
time.Sleep(time.Millisecond * 150)
ngr := runtime.NumGoroutine()
if ngr > 25 {
// panic("uncomment me to debug")
t.Fatal("leaking goroutines:", ngr)
}
}
......@@ -8,8 +8,8 @@ import (
pstore "github.com/ipfs/go-libp2p-peerstore"
ma "github.com/jbenet/go-multiaddr"
"github.com/jbenet/goprocess"
conn "github.com/libp2p/go-libp2p-conn"
protocol "github.com/libp2p/go-libp2p-protocol"
conn "github.com/libp2p/go-libp2p/p2p/net/conn"
)
// MessageSizeMax is a soft (recommended) maximum for network messages.
......
package mocknet
import (
"context"
"fmt"
"sort"
"sync"
......@@ -9,15 +10,14 @@ import (
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
inet "github.com/libp2p/go-libp2p/p2p/net"
p2putil "github.com/libp2p/go-libp2p/p2p/test/util"
testutil "github.com/libp2p/go-libp2p/testutil"
"context"
ic "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
ma "github.com/jbenet/go-multiaddr"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
testutil "github.com/libp2p/go-testutil"
)
// mocknet implements mocknet.Mocknet
......
......@@ -2,6 +2,7 @@ package mocknet
import (
"bytes"
"context"
"io"
"math"
"math/rand"
......@@ -9,13 +10,12 @@ import (
"testing"
"time"
peer "github.com/ipfs/go-libp2p-peer"
protocol "github.com/libp2p/go-libp2p-protocol"
inet "github.com/libp2p/go-libp2p/p2p/net"
testutil "github.com/libp2p/go-libp2p/testutil"
"context"
peer "github.com/ipfs/go-libp2p-peer"
detectrace "github.com/jbenet/go-detect-race"
protocol "github.com/libp2p/go-libp2p-protocol"
testutil "github.com/libp2p/go-testutil"
)
func randPeer(t *testing.T) peer.ID {
......
package addrutil
import (
"fmt"
"context"
logging "github.com/ipfs/go-log"
ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
_ "github.com/whyrusleeping/ws-transport"
)
var log = logging.Logger("github.com/libp2p/go-libp2p/p2p/net/swarm/addr")
// SupportedTransportStrings is the list of supported transports for the swarm.
// These are strings of encapsulated multiaddr protocols. E.g.:
// /ip4/tcp
var SupportedTransportStrings = []string{
"/ip4/tcp",
"/ip6/tcp",
"/ip4/udp/utp",
"/ip6/udp/utp",
"/ip4/tcp/ws",
"/ip6/tcp/ws",
// "/ip4/udp/udt", disabled because the lib doesnt work on arm
// "/ip6/udp/udt", disabled because the lib doesnt work on arm
}
// SupportedTransportProtocols is the list of supported transports for the swarm.
// These are []ma.Protocol lists. Populated at runtime from SupportedTransportStrings
var SupportedTransportProtocols = [][]ma.Protocol{}
func init() {
// initialize SupportedTransportProtocols
transports := make([][]ma.Protocol, len(SupportedTransportStrings))
for _, s := range SupportedTransportStrings {
t, err := ma.ProtocolsWithString(s)
if err != nil {
panic(err) // important to fix this in the codebase
}
transports = append(transports, t)
}
SupportedTransportProtocols = transports
}
// FilterAddrs is a filter that removes certain addresses, according the given filters.
// if all filters return true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr {
b := make([]ma.Multiaddr, 0, len(a))
for _, addr := range a {
good := true
for _, filter := range filters {
good = good && filter(addr)
}
if good {
b = append(b, addr)
}
}
return b
}
// FilterUsableAddrs removes certain addresses
// from a list. the addresses removed are those known NOT
// to work with our network. Namely, addresses with UTP.
func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr {
return FilterAddrs(a, AddrUsableFunc)
}
func AddrUsableFunc(m ma.Multiaddr) bool {
return AddrUsable(m, false)
}
// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
func AddrOverNonLocalIP(a ma.Multiaddr) bool {
split := ma.Split(a)
if len(split) < 1 {
return false
}
if manet.IsIP6LinkLocal(split[0]) {
return false
}
return true
}
// AddrUsable returns whether our network can use this addr.
// We only use the transports in SupportedTransportStrings,
// and we do not link local addresses. Loopback is ok
// as we need to be able to connect to multiple ipfs nodes
// in the same machine.
func AddrUsable(a ma.Multiaddr, partial bool) bool {
if a == nil {
return false
}
if !AddrOverNonLocalIP(a) {
return false
}
// test the address protocol list is in SupportedTransportProtocols
matches := func(supported, test []ma.Protocol) bool {
if len(test) > len(supported) {
return false
}
// when partial, it's ok if test < supported.
if !partial && len(supported) != len(test) {
return false
}
for i := range test {
if supported[i].Code != test[i].Code {
return false
}
}
return true
}
transport := a.Protocols()
for _, supported := range SupportedTransportProtocols {
if matches(supported, transport) {
return true
}
}
return false
}
// ResolveUnspecifiedAddress expands an unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces. If ifaceAddr is nil, we request interface addresses
// from the network stack. (this is so you can provide a cached value if resolving many addrs)
func ResolveUnspecifiedAddress(resolve ma.Multiaddr, ifaceAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
// split address into its components
split := ma.Split(resolve)
// if first component (ip) is not unspecified, use it as is.
if !manet.IsIPUnspecified(split[0]) {
return []ma.Multiaddr{resolve}, nil
}
out := make([]ma.Multiaddr, 0, len(ifaceAddrs))
for _, ia := range ifaceAddrs {
// must match the first protocol to be resolve.
if ia.Protocols()[0].Code != resolve.Protocols()[0].Code {
continue
}
split[0] = ia
joined := ma.Join(split...)
out = append(out, joined)
log.Debug("adding resolved addr:", resolve, joined, out)
}
if len(out) < 1 {
return nil, fmt.Errorf("failed to resolve: %s", resolve)
}
return out, nil
}
// ResolveUnspecifiedAddresses expands unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func ResolveUnspecifiedAddresses(unspecAddrs, ifaceAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
// todo optimize: only fetch these if we have a "any" addr.
if len(ifaceAddrs) < 1 {
var err error
ifaceAddrs, err = InterfaceAddresses()
if err != nil {
return nil, err
}
// log.Debug("InterfaceAddresses:", ifaceAddrs)
}
var outputAddrs []ma.Multiaddr
for _, a := range unspecAddrs {
// unspecified?
resolved, err := ResolveUnspecifiedAddress(a, ifaceAddrs)
if err != nil {
continue // optimistic. if we cant resolve anything, we'll know at the bottom.
}
// log.Debug("resolved:", a, resolved)
outputAddrs = append(outputAddrs, resolved...)
}
if len(outputAddrs) < 1 {
return nil, fmt.Errorf("failed to specify addrs: %s", unspecAddrs)
}
log.Event(context.TODO(), "interfaceListenAddresses", func() logging.Loggable {
var addrs []string
for _, addr := range outputAddrs {
addrs = append(addrs, addr.String())
}
return logging.Metadata{"addresses": addrs}
}())
log.Debug("ResolveUnspecifiedAddresses:", unspecAddrs, ifaceAddrs, outputAddrs)
return outputAddrs, nil
}
// InterfaceAddresses returns a list of addresses associated with local machine
// Note: we do not return link local addresses. IP loopback is ok, because we
// may be connecting to other nodes in the same machine.
func InterfaceAddresses() ([]ma.Multiaddr, error) {
maddrs, err := manet.InterfaceMultiaddrs()
if err != nil {
return nil, err
}
log.Debug("InterfaceAddresses: from manet:", maddrs)
var out []ma.Multiaddr
for _, a := range maddrs {
if !AddrUsable(a, true) { // partial
// log.Debug("InterfaceAddresses: skipping unusable:", a)
continue
}
out = append(out, a)
}
log.Debug("InterfaceAddresses: usable:", out)
return out, nil
}
// AddrInList returns whether or not an address is part of a list.
// this is useful to check if NAT is happening (or other bugs?)
func AddrInList(addr ma.Multiaddr, list []ma.Multiaddr) bool {
for _, addr2 := range list {
if addr.Equal(addr2) {
return true
}
}
return false
}
// AddrIsShareableOnWAN returns whether the given address should be shareable on the
// wide area network (wide internet).
func AddrIsShareableOnWAN(addr ma.Multiaddr) bool {
s := ma.Split(addr)
if len(s) < 1 {
return false
}
a := s[0]
if manet.IsIPLoopback(a) || manet.IsIP6LinkLocal(a) || manet.IsIPUnspecified(a) {
return false
}
return manet.IsThinWaist(a)
}
// WANShareableAddrs filters addresses based on whether they're shareable on WAN
func WANShareableAddrs(inp []ma.Multiaddr) []ma.Multiaddr {
return FilterAddrs(inp, AddrIsShareableOnWAN)
}
// Subtract filters out all addrs in b from a
func Subtract(a, b []ma.Multiaddr) []ma.Multiaddr {
return FilterAddrs(a, func(m ma.Multiaddr) bool {
for _, bb := range b {
if m.Equal(bb) {
return false
}
}
return true
})
}
// CheckNATWarning checks if our observed addresses differ. if so,
// informs the user that certain things might not work yet
func CheckNATWarning(observed, expected ma.Multiaddr, listen []ma.Multiaddr) {
if observed.Equal(expected) {
return
}
if !AddrInList(observed, listen) { // probably a nat
log.Warningf(natWarning, observed, listen)
}
}
const natWarning = `Remote peer observed our address to be: %s
The local addresses are: %s
Thus, connection is going through NAT, and other connections may fail.
IPFS NAT traversal is still under development. Please bug us on github or irc to fix this.
Baby steps: http://jbenet.static.s3.amazonaws.com/271dfcf/baby-steps.gif
`
package addrutil
import (
"testing"
ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
)
func newMultiaddr(t *testing.T, s string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return maddr
}
func TestFilterAddrs(t *testing.T) {
bad := []ma.Multiaddr{
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234"), // unreliable
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"), // link local
newMultiaddr(t, "/ip6/fe80::100/tcp/1234"), // link local
}
good := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
newMultiaddr(t, "/ip6/::1/tcp/1234"),
newMultiaddr(t, "/ip4/1.2.3.4/udp/1234/utp"),
newMultiaddr(t, "/ip4/1.2.3.4/tcp/1234/ws"),
}
goodAndBad := append(good, bad...)
// test filters
for _, a := range bad {
if AddrUsable(a, false) {
t.Errorf("addr %s should be unusable", a)
}
}
for _, a := range good {
if !AddrUsable(a, false) {
t.Errorf("addr %s should be usable", a)
}
}
subtestAddrsEqual(t, FilterUsableAddrs(bad), []ma.Multiaddr{})
subtestAddrsEqual(t, FilterUsableAddrs(good), good)
subtestAddrsEqual(t, FilterUsableAddrs(goodAndBad), good)
}
func subtestAddrsEqual(t *testing.T, a, b []ma.Multiaddr) {
if len(a) != len(b) {
t.Error(t)
}
in := func(addr ma.Multiaddr, l []ma.Multiaddr) bool {
for _, addr2 := range l {
if addr.Equal(addr2) {
return true
}
}
return false
}
for _, aa := range a {
if !in(aa, b) {
t.Errorf("%s not in %s", aa, b)
}
}
}
func TestInterfaceAddrs(t *testing.T) {
addrs, err := InterfaceAddresses()
if err != nil {
t.Fatal(err)
}
if len(addrs) < 1 {
t.Error("no addresses")
}
for _, a := range addrs {
if manet.IsIP6LinkLocal(a) {
t.Error("should not return ip link local addresses", a)
}
}
if len(addrs) < 1 {
t.Error("no good interface addrs")
}
}
func TestResolvingAddrs(t *testing.T) {
unspec := []ma.Multiaddr{
newMultiaddr(t, "/ip4/0.0.0.0/tcp/1234"),
newMultiaddr(t, "/ip4/1.2.3.4/tcp/1234"),
newMultiaddr(t, "/ip6/::/tcp/1234"),
newMultiaddr(t, "/ip6/::100/tcp/1234"),
}
iface := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1"),
newMultiaddr(t, "/ip4/10.20.30.40"),
newMultiaddr(t, "/ip6/::1"),
newMultiaddr(t, "/ip6/::f"),
}
spec := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
newMultiaddr(t, "/ip4/10.20.30.40/tcp/1234"),
newMultiaddr(t, "/ip4/1.2.3.4/tcp/1234"),
newMultiaddr(t, "/ip6/::1/tcp/1234"),
newMultiaddr(t, "/ip6/::f/tcp/1234"),
newMultiaddr(t, "/ip6/::100/tcp/1234"),
}
actual, err := ResolveUnspecifiedAddresses(unspec, iface)
if err != nil {
t.Fatal(err)
}
for i, a := range actual {
if !a.Equal(spec[i]) {
t.Error(a, " != ", spec[i])
}
}
ip4u := []ma.Multiaddr{newMultiaddr(t, "/ip4/0.0.0.0")}
ip4i := []ma.Multiaddr{newMultiaddr(t, "/ip4/1.2.3.4")}
ip6u := []ma.Multiaddr{newMultiaddr(t, "/ip6/::")}
ip6i := []ma.Multiaddr{newMultiaddr(t, "/ip6/::1")}
if _, err := ResolveUnspecifiedAddress(ip4u[0], ip6i); err == nil {
t.Fatal("should have failed")
}
if _, err := ResolveUnspecifiedAddress(ip6u[0], ip4i); err == nil {
t.Fatal("should have failed")
}
if _, err := ResolveUnspecifiedAddresses(ip6u, ip4i); err == nil {
t.Fatal("should have failed")
}
if _, err := ResolveUnspecifiedAddresses(ip4u, ip6i); err == nil {
t.Fatal("should have failed")
}
}
func TestWANShareable(t *testing.T) {
wanok := []ma.Multiaddr{
newMultiaddr(t, "/ip4/1.2.3.4/tcp/1234"),
newMultiaddr(t, "/ip6/abcd::1/tcp/1234"),
}
wanbad := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
newMultiaddr(t, "/ip4/0.0.0.0/tcp/1234"),
newMultiaddr(t, "/ip6/::1/tcp/1234"),
newMultiaddr(t, "/ip6/::/tcp/1234"),
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"),
newMultiaddr(t, "/ip6/fe80::/tcp/1234"),
}
for _, a := range wanok {
if !AddrIsShareableOnWAN(a) {
t.Error("should be true", a)
}
}
for _, a := range wanbad {
if AddrIsShareableOnWAN(a) {
t.Error("should be false", a)
}
}
wanok2 := WANShareableAddrs(wanok)
if len(wanok) != len(wanok2) {
t.Error("should be the same")
}
wanbad2 := WANShareableAddrs(wanbad)
if len(wanbad2) != 0 {
t.Error("should be zero")
}
}
func TestSubtract(t *testing.T) {
a := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
newMultiaddr(t, "/ip4/0.0.0.0/tcp/1234"),
newMultiaddr(t, "/ip6/::1/tcp/1234"),
newMultiaddr(t, "/ip6/::/tcp/1234"),
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"),
newMultiaddr(t, "/ip6/fe80::/tcp/1234"),
}
b := []ma.Multiaddr{
newMultiaddr(t, "/ip4/127.0.0.1/tcp/1234"),
newMultiaddr(t, "/ip6/::1/tcp/1234"),
newMultiaddr(t, "/ip6/fe80::1/tcp/1234"),
}
c1 := []ma.Multiaddr{
newMultiaddr(t, "/ip4/0.0.0.0/tcp/1234"),
newMultiaddr(t, "/ip6/::/tcp/1234"),
newMultiaddr(t, "/ip6/fe80::/tcp/1234"),
}
c2 := Subtract(a, b)
if len(c1) != len(c2) {
t.Error("should be the same")
}
for i, ca := range c1 {
if !c2[i].Equal(ca) {
t.Error("should be the same", ca, c2[i])
}
}
}
package addrutil
import (
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)
// SubtractFilter returns a filter func that filters all of the given addresses
func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool {
addrmap := make(map[string]bool)
for _, a := range addrs {
addrmap[string(a.Bytes())] = true
}
return func(a ma.Multiaddr) bool {
return !addrmap[string(a.Bytes())]
}
}
// IsFDCostlyTransport returns true for transports that require a new file
// descriptor per connection created
func IsFDCostlyTransport(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}
// FilterNeg returns a negated version of the passed in filter
func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool {
return func(a ma.Multiaddr) bool {
return !f(a)
}
}
package swarm
import (
"context"
"net"
"sync"
"testing"
"time"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
testutil "github.com/libp2p/go-libp2p/testutil"
ci "github.com/libp2p/go-libp2p/testutil/ci"
"context"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
ma "github.com/jbenet/go-multiaddr"
manet "github.com/jbenet/go-multiaddr-net"
addrutil "github.com/libp2p/go-addr-util"
testutil "github.com/libp2p/go-testutil"
ci "github.com/libp2p/go-testutil/ci"
)
func closeSwarms(swarms []*Swarm) {
......
......@@ -6,9 +6,8 @@ import (
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
conn "github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
addrutil "github.com/libp2p/go-addr-util"
conn "github.com/libp2p/go-libp2p-conn"
)
type dialResult struct {
......
......@@ -10,9 +10,8 @@ import (
peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
conn "github.com/libp2p/go-libp2p-conn"
mafmt "github.com/whyrusleeping/mafmt"
conn "github.com/libp2p/go-libp2p/p2p/net/conn"
)
func mustAddr(t *testing.T, s string) ma.Multiaddr {
......
package swarm
import (
"context"
"runtime"
"sync"
"testing"
"time"
ci "github.com/libp2p/go-libp2p/testutil/ci"
"context"
peer "github.com/ipfs/go-libp2p-peer"
pstore "github.com/ipfs/go-libp2p-peerstore"
ma "github.com/jbenet/go-multiaddr"
ci "github.com/libp2p/go-testutil/ci"
)
func TestSimultOpen(t *testing.T) {
......
......@@ -14,8 +14,6 @@ import (
metrics "github.com/libp2p/go-libp2p/p2p/metrics"
mconn "github.com/libp2p/go-libp2p/p2p/metrics/conn"
inet "github.com/libp2p/go-libp2p/p2p/net"
conn "github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil "github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
ci "github.com/ipfs/go-libp2p-crypto"
peer "github.com/ipfs/go-libp2p-peer"
......@@ -26,6 +24,8 @@ import (
pst "github.com/jbenet/go-stream-muxer"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
addrutil "github.com/libp2p/go-addr-util"
conn "github.com/libp2p/go-libp2p-conn"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
tcpt "github.com/libp2p/go-tcp-transport"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment