Commit 1600dc35 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub
Browse files

Merge pull request #134 from libp2p/feat/extracting-5

extract libp2p-swarm
parents 10689ca6 7aced371
......@@ -8,14 +8,14 @@ import (
"log"
"strings"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
net "github.com/libp2p/go-libp2p-net"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
testutil "github.com/libp2p/go-testutil"
ma "github.com/multiformats/go-multiaddr"
)
......
......@@ -5,7 +5,7 @@ import (
"fmt"
"os"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p-swarm"
tcpt "github.com/libp2p/go-tcp-transport"
ma "github.com/multiformats/go-multiaddr"
)
......
package swarm
import (
"context"
"sync"
peer "github.com/libp2p/go-libp2p-peer"
)
type DialFunc func(context.Context, peer.ID) (*Conn, error)
func NewDialSync(dfn DialFunc) *DialSync {
return &DialSync{
dials: make(map[peer.ID]*activeDial),
dialFunc: dfn,
}
}
type DialSync struct {
dials map[peer.ID]*activeDial
dialsLk sync.Mutex
dialFunc DialFunc
}
type activeDial struct {
id peer.ID
refCnt int
refCntLk sync.Mutex
cancel func()
err error
conn *Conn
waitch chan struct{}
ds *DialSync
}
func (dr *activeDial) wait(ctx context.Context) (*Conn, error) {
defer dr.decref()
select {
case <-dr.waitch:
return dr.conn, dr.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (ad *activeDial) incref() {
ad.refCntLk.Lock()
defer ad.refCntLk.Unlock()
ad.refCnt++
}
func (ad *activeDial) decref() {
ad.refCntLk.Lock()
defer ad.refCntLk.Unlock()
ad.refCnt--
if ad.refCnt <= 0 {
ad.cancel()
ad.ds.dialsLk.Lock()
delete(ad.ds.dials, ad.id)
ad.ds.dialsLk.Unlock()
}
}
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
ds.dialsLk.Lock()
actd, ok := ds.dials[p]
if !ok {
ctx, cancel := context.WithCancel(context.Background())
actd = &activeDial{
id: p,
cancel: cancel,
waitch: make(chan struct{}),
ds: ds,
}
ds.dials[p] = actd
go func(ctx context.Context, p peer.ID, ad *activeDial) {
ad.conn, ad.err = ds.dialFunc(ctx, p)
close(ad.waitch)
ad.cancel()
ad.waitch = nil // to ensure nobody tries reusing this
}(ctx, p, actd)
}
actd.incref()
ds.dialsLk.Unlock()
return actd.wait(ctx)
}
package swarm
import (
"context"
"fmt"
"sync"
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
)
func getMockDialFunc() (DialFunc, func(), context.Context, <-chan struct{}) {
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(ctx context.Context, p peer.ID) (*Conn, error) {
dfcalls <- struct{}{}
defer cancel()
select {
case <-ch:
return new(Conn), nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
o := new(sync.Once)
return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls
}
func TestBasicDialSync(t *testing.T) {
df, done, _, callsch := getMockDialFunc()
dsync := NewDialSync(df)
p := peer.ID("testpeer")
ctx := context.Background()
finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()
// short sleep just to make sure we've moved around in the scheduler
time.Sleep(time.Millisecond * 20)
done()
<-finished
<-finished
if len(callsch) > 1 {
t.Fatal("should only have called dial func once!")
}
}
func TestDialSyncCancel(t *testing.T) {
df, done, _, dcall := getMockDialFunc()
dsync := NewDialSync(df)
p := peer.ID("testpeer")
ctx1, cancel1 := context.WithCancel(context.Background())
finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()
// make sure the above makes it through the wait code first
select {
case <-dcall:
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial to start")
}
// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
t.Error(err)
}
finished <- struct{}{}
}()
time.Sleep(time.Millisecond * 20)
// cancel the first dialwait, it should not affect the second at all
cancel1()
select {
case <-finished:
case <-time.After(time.Second):
t.Fatal("timed out waiting for wait to exit")
}
// short sleep just to make sure we've moved around in the scheduler
time.Sleep(time.Millisecond * 20)
done()
<-finished
}
func TestDialSyncAllCancel(t *testing.T) {
df, done, dctx, _ := getMockDialFunc()
dsync := NewDialSync(df)
p := peer.ID("testpeer")
ctx1, cancel1 := context.WithCancel(context.Background())
finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()
// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()
cancel1()
for i := 0; i < 2; i++ {
select {
case <-finished:
case <-time.After(time.Second):
t.Fatal("timed out waiting for wait to exit")
}
}
// the dial should have exited now
select {
case <-dctx.Done():
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial to return")
}
// should be able to successfully dial that peer again
done()
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
t.Fatal(err)
}
}
func TestFailFirst(t *testing.T) {
var count int
f := func(ctx context.Context, p peer.ID) (*Conn, error) {
if count > 0 {
return new(Conn), nil
}
count++
return nil, fmt.Errorf("gophers ate the modem")
}
ds := NewDialSync(f)
p := peer.ID("testing")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err := ds.DialLock(ctx, p)
if err == nil {
t.Fatal("expected gophers to have eaten the modem")
}
c, err := ds.DialLock(ctx, p)
if err != nil {
t.Fatal(err)
}
if c == nil {
t.Fatal("should have gotten a 'real' conn back")
}
}
package swarm
import (
"context"
"net"
"sync"
"testing"
"time"
addrutil "github.com/libp2p/go-addr-util"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
testutil "github.com/libp2p/go-testutil"
ci "github.com/libp2p/go-testutil/ci"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
func closeSwarms(swarms []*Swarm) {
for _, s := range swarms {
s.Close()
}
}
func TestBasicDial(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
defer closeSwarms(swarms)
s1 := swarms[0]
s2 := swarms[1]
s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), pstore.PermanentAddrTTL)
c, err := s1.Dial(ctx, s2.local)
if err != nil {
t.Fatal(err)
}
s, err := c.NewStream()
if err != nil {
t.Fatal(err)
}
s.Close()
}
func TestDialWithNoListeners(t *testing.T) {
t.Parallel()
ctx := context.Background()
s1 := makeDialOnlySwarm(ctx, t)
swarms := makeSwarms(ctx, t, 1)
defer closeSwarms(swarms)
s2 := swarms[0]
s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), pstore.PermanentAddrTTL)
c, err := s1.Dial(ctx, s2.local)
if err != nil {
t.Fatal(err)
}
s, err := c.NewStream()
if err != nil {
t.Fatal(err)
}
s.Close()
}
func acceptAndHang(l net.Listener) {
conns := make([]net.Conn, 0, 10)
for {
c, err := l.Accept()
if err != nil {
break
}
if c != nil {
conns = append(conns, c)
}
}
for _, c := range conns {
c.Close()
}
}
func TestSimultDials(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
s.peers.AddAddr(dst, addr, pstore.TempAddrTTL)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
ifaceAddrs0, err := swarms[0].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
log.Info("Connecting swarms simultaneously.")
for i := 0; i < 10; i++ { // connect 10x for each.
wg.Add(2)
go connect(swarms[0], swarms[1].local, ifaceAddrs1[0])
go connect(swarms[1], swarms[0].local, ifaceAddrs0[0])
}
wg.Wait()
}
// should still just have 1, at most 2 connections :)
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
if c01l > 2 {
t.Error("0->1 has", c01l)
}
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
if c10l > 2 {
t.Error("1->0 has", c10l)
}
for _, s := range swarms {
s.Close()
}
}
func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) {
dst := testutil.RandPeerIDFatal(t)
lst, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
addr, err := manet.FromNetAddr(lst.Addr())
if err != nil {
t.Fatal(err)
}
addrs := []ma.Multiaddr{addr}
addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil)
if err != nil {
t.Fatal(err)
}
t.Log("new silent peer:", dst, addrs[0])
return dst, addrs[0], lst
}
func TestDialWait(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 1)
s1 := swarms[0]
defer s1.Close()
s1.dialT = time.Millisecond * 300 // lower timeout for tests.
if ci.IsRunning() {
s1.dialT = time.Second
}
// dial to a non-existent peer.
s2p, s2addr, s2l := newSilentPeer(t)
go acceptAndHang(s2l)
defer s2l.Close()
s1.peers.AddAddr(s2p, s2addr, pstore.PermanentAddrTTL)
before := time.Now()
if c, err := s1.Dial(ctx, s2p); err == nil {
defer c.Close()
t.Fatal("error swarm dialing to unknown peer worked...", err)
} else {
t.Log("correctly got error:", err)
}
duration := time.Since(before)
dt := s1.dialT
if duration < dt*dialAttempts {
t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts)
}
if duration > 2*dt*dialAttempts {
t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts)
}
if !s1.backf.Backoff(s2p) {
t.Error("s2 should now be on backoff")
}
}
func TestDialBackoff(t *testing.T) {
// t.Skip("skipping for another test")
if ci.IsRunning() {
t.Skip("travis will never have fun with this test")
}
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()
s1.dialT = time.Second // lower timeout for tests.
s2.dialT = time.Second // lower timeout for tests.
s2addrs, err := s2.InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
s1.peers.AddAddrs(s2.local, s2addrs, pstore.PermanentAddrTTL)
// dial to a non-existent peer.
s3p, s3addr, s3l := newSilentPeer(t)
go acceptAndHang(s3l)
defer s3l.Close()
s1.peers.AddAddr(s3p, s3addr, pstore.PermanentAddrTTL)
// in this test we will:
// 1) dial 10x to each node.
// 2) all dials should hang
// 3) s1->s2 should succeed.
// 4) s1->s3 should not (and should place s3 on backoff)
// 5) disconnect entirely
// 6) dial 10x to each node again
// 7) s3 dials should all return immediately (except 1)
// 8) s2 dials should all hang, and succeed
// 9) last s3 dial ends, unsuccessful
dialOnlineNode := func(dst peer.ID, times int) <-chan bool {
ch := make(chan bool)
for i := 0; i < times; i++ {
go func() {
if _, err := s1.Dial(ctx, dst); err != nil {
t.Error("error dialing", dst, err)
ch <- false
} else {
ch <- true
}
}()
}
return ch
}
dialOfflineNode := func(dst peer.ID, times int) <-chan bool {
ch := make(chan bool)
for i := 0; i < times; i++ {
go func() {
if c, err := s1.Dial(ctx, dst); err != nil {
ch <- false
} else {
t.Error("succeeded in dialing", dst)
ch <- true
c.Close()
}
}()
}
return ch
}
{
// 1) dial 10x to each node.
N := 10
s2done := dialOnlineNode(s2.local, N)
s3done := dialOfflineNode(s3p, N)
// when all dials should be done by:
dialTimeout1x := time.After(s1.dialT)
// dialTimeout1Ax := time.After(s1.dialT * 2) // dialAttempts)
dialTimeout10Ax := time.After(s1.dialT * 2 * 10) // dialAttempts * 10)
// 2) all dials should hang
select {
case <-s2done:
t.Error("s2 should not happen immediately")
case <-s3done:
t.Error("s3 should not happen yet")
case <-time.After(time.Millisecond):
// s2 may finish very quickly, so let's get out.
}
// 3) s1->s2 should succeed.
for i := 0; i < N; i++ {
select {
case r := <-s2done:
if !r {
t.Error("s2 should not fail")
}
case <-s3done:
t.Error("s3 should not happen yet")
case <-dialTimeout1x:
t.Error("s2 took too long")
}
}
select {
case <-s2done:
t.Error("s2 should have no more")
case <-s3done:
t.Error("s3 should not happen yet")
case <-dialTimeout1x: // let it pass
}
// 4) s1->s3 should not (and should place s3 on backoff)
// N-1 should finish before dialTimeout1x * 2
for i := 0; i < N; i++ {
select {
case <-s2done:
t.Error("s2 should have no more")
case r := <-s3done:
if r {
t.Error("s3 should not succeed")
}
case <-(dialTimeout1x):
if i < (N - 1) {
t.Fatal("s3 took too long")
}
t.Log("dialTimeout1x * 1.3 hit for last peer")
case <-dialTimeout10Ax:
t.Fatal("s3 took too long")
}
}
// check backoff state
if s1.backf.Backoff(s2.local) {
t.Error("s2 should not be on backoff")
}
if !s1.backf.Backoff(s3p) {
t.Error("s3 should be on backoff")
}
// 5) disconnect entirely
for _, c := range s1.Connections() {
c.Close()
}
for i := 0; i < 100 && len(s1.Connections()) > 0; i++ {
<-time.After(time.Millisecond)
}
if len(s1.Connections()) > 0 {
t.Fatal("s1 conns must exit")
}
}
{
// 6) dial 10x to each node again
N := 10
s2done := dialOnlineNode(s2.local, N)
s3done := dialOfflineNode(s3p, N)
// when all dials should be done by:
dialTimeout1x := time.After(s1.dialT)
// dialTimeout1Ax := time.After(s1.dialT * 2) // dialAttempts)
dialTimeout10Ax := time.After(s1.dialT * 2 * 10) // dialAttempts * 10)
// 7) s3 dials should all return immediately (except 1)
for i := 0; i < N-1; i++ {
select {
case <-s2done:
t.Error("s2 should not succeed yet")
case r := <-s3done:
if r {
t.Error("s3 should not succeed")
}
case <-dialTimeout1x:
t.Fatal("s3 took too long")
}
}
// 8) s2 dials should all hang, and succeed
for i := 0; i < N; i++ {
select {
case r := <-s2done:
if !r {
t.Error("s2 should succeed")
}
// case <-s3done:
case <-(dialTimeout1x):
t.Fatal("s3 took too long")
}
}
// 9) the last s3 should return, failed.
select {
case <-s2done:
t.Error("s2 should have no more")
case r := <-s3done:
if r {
t.Error("s3 should not succeed")
}
case <-dialTimeout10Ax:
t.Fatal("s3 took too long")
}
// check backoff state (the same)
if s1.backf.Backoff(s2.local) {
t.Error("s2 should not be on backoff")
}
if !s1.backf.Backoff(s3p) {
t.Error("s3 should be on backoff")
}
}
}
func TestDialBackoffClears(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
s2 := swarms[1]
defer s1.Close()
defer s2.Close()
s1.dialT = time.Millisecond * 300 // lower timeout for tests.
s2.dialT = time.Millisecond * 300 // lower timeout for tests.
if ci.IsRunning() {
s1.dialT = 2 * time.Second
s2.dialT = 2 * time.Second
}
// use another address first, that accept and hang on conns
_, s2bad, s2l := newSilentPeer(t)
go acceptAndHang(s2l)
defer s2l.Close()
// phase 1 -- dial to non-operational addresses
s1.peers.AddAddr(s2.local, s2bad, pstore.PermanentAddrTTL)
before := time.Now()
if c, err := s1.Dial(ctx, s2.local); err == nil {
t.Fatal("dialing to broken addr worked...", err)
defer c.Close()
} else {
t.Log("correctly got error:", err)
}
duration := time.Since(before)
dt := s1.dialT
if duration < dt*dialAttempts {
t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts)
}
if duration > 2*dt*dialAttempts {
t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts)
}
if !s1.backf.Backoff(s2.local) {
t.Error("s2 should now be on backoff")
} else {
t.Log("correctly added to backoff")
}
// phase 2 -- add the working address. dial should succeed.
ifaceAddrs1, err := swarms[1].InterfaceListenAddresses()
if err != nil {
t.Fatal(err)
}
s1.peers.AddAddrs(s2.local, ifaceAddrs1, pstore.PermanentAddrTTL)
if _, err := s1.Dial(ctx, s2.local); err == nil {
t.Fatal("should have failed to dial backed off peer")
}
time.Sleep(baseBackoffTime)
if c, err := s1.Dial(ctx, s2.local); err != nil {
t.Fatal(err)
} else {
c.Close()
t.Log("correctly connected")
}
if s1.backf.Backoff(s2.local) {
t.Error("s2 should no longer be on backoff")
} else {
t.Log("correctly cleared backoff")
}
}
package swarm
import (
"context"
"sync"
addrutil "github.com/libp2p/go-addr-util"
iconn "github.com/libp2p/go-libp2p-interface-conn"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
type dialResult struct {
Conn iconn.Conn
Err error
}
type dialJob struct {
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
success bool
}
func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}
type dialLimiter struct {
rllock sync.Mutex
fdConsuming int
fdLimit int
waitingOnFd []*dialJob
dialFunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
activePerPeer map[peer.ID]int
perPeerLimit int
waitingOnPeerLimit map[peer.ID][]*dialJob
}
type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error)
func newDialLimiter(df dialfunc) *dialLimiter {
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}
func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
return &dialLimiter{
fdLimit: fdl,
perPeerLimit: ppl,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
}
}
func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++
go dl.executeDial(next)
}
}
// release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
delete(dl.activePerPeer, dj.peer)
}
waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]
if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
} else {
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token
// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
}
}
// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++
if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj)
return
}
// take token
dl.fdConsuming++
}
// take second needed token and start dial!
go dl.executeDial(dj)
}
// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}
con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Err: err}:
case <-j.ctx.Done():
}
}
package swarm
import (
"context"
"fmt"
"math/rand"
"strconv"
"testing"
"time"
iconn "github.com/libp2p/go-libp2p-interface-conn"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)
func mustAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return a
}
func addrWithPort(t *testing.T, p int) ma.Multiaddr {
return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p))
}
// in these tests I use addresses with tcp ports over a certain number to
// signify 'good' addresses that will succeed, and addresses below that number
// will fail. This lets us more easily test these different scenarios.
func tcpPortOver(a ma.Multiaddr, n int) bool {
port, err := a.ValueForProtocol(ma.P_TCP)
if err != nil {
panic(err)
}
pnum, err := strconv.Atoi(port)
if err != nil {
panic(err)
}
return pnum > n
}
func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) {
for _, a := range addrs {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: p,
addr: a,
resp: res,
})
}
}
func hangDialFunc(hang chan struct{}) dialfunc {
return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) {
if mafmt.UTP.Matches(a) {
return iconn.Conn(nil), nil
}
if tcpPortOver(a, 10) {
return iconn.Conn(nil), nil
}
<-hang
return nil, fmt.Errorf("test bad dial")
}
}
func TestLimiterBasicDials(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)
resch := make(chan dialResult)
pid := peer.ID("testpeer")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tryDialAddrs(ctx, l, pid, bads, resch)
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// complete a single hung dial
hang <- struct{}{}
select {
case r := <-resch:
if r.Err == nil {
t.Fatal("should have gotten failed dial result")
}
case <-time.After(time.Second):
t.Fatal("timed out waiting for dial completion")
}
select {
case r := <-resch:
if r.Err != nil {
t.Fatal("expected second result to be success!")
}
case <-time.After(time.Second):
}
}
func TestFDLimiting(t *testing.T) {
hang := make(chan struct{})
defer close(hang)
l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"}
goodTCP := addrWithPort(t, 20)
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
tryDialAddrs(ctx, l, pid, bads, resch)
}
// these dials should work normally, but will hang because we have taken
// up all the fd limiting
for _, pid := range pids {
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pid,
addr: goodTCP,
resp: resch,
})
}
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have gotten successful response")
}
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for utp addr success")
}
}
func TestTokenRedistribution(t *testing.T) {
hangchs := make(map[peer.ID]chan struct{})
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) {
if tcpPortOver(a, 10) {
return (iconn.Conn)(nil), nil
}
<-hangchs[p]
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(df, 8, 4)
bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
pids := []peer.ID{"testpeer1", "testpeer2"}
ctx := context.Background()
resch := make(chan dialResult)
// take all fd limit tokens with hang dials
for _, pid := range pids {
hangchs[pid] = make(chan struct{})
tryDialAddrs(ctx, l, pid, bads, resch)
}
good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001")
// add a good dial job for peer 1
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[1],
addr: good,
resp: resch,
})
select {
case <-resch:
t.Fatal("no dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// unblock one dial for peer 0
hangchs[pids[0]] <- struct{}{}
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case <-resch:
t.Fatal("no more dials should have completed!")
case <-time.After(time.Millisecond * 100):
}
// add a bad dial job to peer 0 to fill their rate limiter
// and test that more dials for this peer won't interfere with peer 1's successful dial incoming
l.AddDialJob(&dialJob{
ctx: ctx,
peer: pids[0],
addr: addrWithPort(t, 7),
resp: resch,
})
hangchs[pids[1]] <- struct{}{}
// now one failed dial from peer 1 should get through and fail
// which will in turn unblock the successful dial on peer 1
select {
case res := <-resch:
if res.Err == nil {
t.Fatal("should have only been a failure here")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("expected a dial failure here")
}
select {
case res := <-resch:
if res.Err != nil {
t.Fatal("should have succeeded!")
}
case <-time.After(time.Millisecond * 100):
t.Fatal("should have gotten successful dial")
}
}
func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) {
if tcpPortOver(a, 1000) {
return iconn.Conn(nil), nil
}
time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100)))
return nil, fmt.Errorf("test bad dial")
}
l := newDialLimiterWithParams(df, 20, 5)
var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
}
addresses := append(bads, addrWithPort(t, 2000))
success := make(chan struct{})
for i := 0; i < 20; i++ {
go func(id peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp := make(chan dialResult)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
for _, i := range rand.Perm(len(addresses)) {
l.AddDialJob(&dialJob{
addr: addresses[i],
ctx: ctx,
peer: id,
resp: resp,
})
}
for res := range resp {
if res.Err == nil {
success <- struct{}{}
return
}
}
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
}
for i := 0; i < 20; i++ {
select {
case <-success:
case <-time.After(time.Second * 5):
t.Fatal("expected a success within five seconds")
}
}
}
package swarm
import (
"testing"
"context"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
func TestPeers(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
s1 := swarms[0]
s2 := swarms[1]
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.peers.AddAddr(dst, addr, pstore.PermanentAddrTTL)
// t.Logf("connections from %s", s.LocalPeer())
// for _, c := range s.ConnectionsToPeer(dst) {
// t.Logf("connection from %s to %s: %v", s.LocalPeer(), dst, c)
// }
// t.Logf("")
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
// t.Log(s.swarm.Dump())
}
s1GotConn := make(chan struct{}, 0)
s2GotConn := make(chan struct{}, 0)
s1.SetConnHandler(func(c *Conn) {
s1GotConn <- struct{}{}
})
s2.SetConnHandler(func(c *Conn) {
s2GotConn <- struct{}{}
})
connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0])
<-s2GotConn // have to wait here so the other side catches up.
connect(s2, s1.LocalPeer(), s1.ListenAddresses()[0])
for i := 0; i < 100; i++ {
connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0])
connect(s2, s1.LocalPeer(), s1.ListenAddresses()[0])
}
for _, s := range swarms {
log.Infof("%s swarm routing table: %s", s.local, s.Peers())
}
test := func(s *Swarm) {
expect := 1
actual := len(s.Peers())
if actual != expect {
t.Errorf("%s has %d peers, not %d: %v", s.LocalPeer(), actual, expect, s.Peers())
t.Log(s.swarm.Dump())
}
actual = len(s.Connections())
if actual != expect {
t.Errorf("%s has %d conns, not %d: %v", s.LocalPeer(), actual, expect, s.Connections())
t.Log(s.swarm.Dump())
}
}
test(s1)
test(s2)
}
package swarm
import (
"context"
"runtime"
"sync"
"testing"
"time"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ci "github.com/libp2p/go-testutil/ci"
ma "github.com/multiformats/go-multiaddr"
)
func TestSimultOpen(t *testing.T) {
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
s.peers.AddAddr(dst, addr, pstore.PermanentAddrTTL)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
wg.Add(2)
go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0])
go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0])
wg.Wait()
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpenMany(t *testing.T) {
// t.Skip("very very slow")
addrs := 20
rounds := 10
if ci.IsRunning() || runtime.GOOS == "darwin" {
// osx has a limit of 256 file descriptors
addrs = 10
rounds = 5
}
SubtestSwarm(t, addrs, rounds)
}
func TestSimultOpenFewStress(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// t.Skip("skipping for another test")
t.Parallel()
msgs := 40
swarms := 2
rounds := 10
// rounds := 100
for i := 0; i < rounds; i++ {
SubtestSwarm(t, swarms, msgs)
<-time.After(10 * time.Millisecond)
}
}
// Package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
package swarm
import (
"context"
"fmt"
"io/ioutil"
"os"
"strings"
"sync"
"time"
logging "github.com/ipfs/go-log"
ps "github.com/jbenet/go-peerstream"
pst "github.com/jbenet/go-stream-muxer"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
addrutil "github.com/libp2p/go-addr-util"
conn "github.com/libp2p/go-libp2p-conn"
ci "github.com/libp2p/go-libp2p-crypto"
metrics "github.com/libp2p/go-libp2p-metrics"
mconn "github.com/libp2p/go-libp2p-metrics/conn"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
tcpt "github.com/libp2p/go-tcp-transport"
ma "github.com/multiformats/go-multiaddr"
psmss "github.com/whyrusleeping/go-smux-multistream"
spdy "github.com/whyrusleeping/go-smux-spdystream"
yamux "github.com/whyrusleeping/go-smux-yamux"
mafilter "github.com/whyrusleeping/multiaddr-filter"
ws "github.com/whyrusleeping/ws-transport"
)
var log = logging.Logger("swarm2")
// PSTransport is the default peerstream transport that will be used by
// any libp2p swarms.
var PSTransport pst.Transport
func init() {
msstpt := psmss.NewBlankTransport()
ymxtpt := &yamux.Transport{
AcceptBacklog: 8192,
ConnectionWriteTimeout: time.Second * 10,
KeepAliveInterval: time.Second * 30,
EnableKeepAlive: true,
MaxStreamWindowSize: uint32(1024 * 512),
LogOutput: ioutil.Discard,
}
msstpt.AddTransport("/yamux/1.0.0", ymxtpt)
msstpt.AddTransport("/spdy/3.1.0", spdy.Transport)
// allow overriding of muxer preferences
if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" {
msstpt.OrderPreference = strings.Fields(prefs)
}
PSTransport = msstpt
}
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
//
// Uses peerstream.Swarm
type Swarm struct {
swarm *ps.Swarm
local peer.ID
peers pstore.Peerstore
connh ConnHandler
dsync *DialSync
backf dialbackoff
dialT time.Duration // mainly for tests
dialer *conn.Dialer
notifmu sync.RWMutex
notifs map[inet.Notifiee]ps.Notifiee
transports []transport.Transport
// filters for addresses that shouldnt be dialed
Filters *filter.Filters
// file descriptor rate limited
fdRateLimit chan struct{}
proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
limiter *dialLimiter
}
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local peer.ID, peers pstore.Peerstore, bwc metrics.Reporter) (*Swarm, error) {
listenAddrs, err := filterAddrs(listenAddrs)
if err != nil {
return nil, err
}
var wrap func(c transport.Conn) transport.Conn
if bwc != nil {
wrap = func(c transport.Conn) transport.Conn {
return mconn.WrapConn(bwc, c)
}
}
s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
transports: []transport.Transport{
tcpt.NewTCPTransport(),
new(ws.WebsocketTransport),
},
bwc: bwc,
fdRateLimit: make(chan struct{}, concurrentFdDials),
Filters: filter.NewFilters(),
dialer: conn.NewDialer(local, peers.PrivKey(local), wrap),
}
s.dsync = NewDialSync(s.doDial)
s.limiter = newDialLimiter(s.dialAddr)
// configure Swarm
s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown)
s.SetConnHandler(nil) // make sure to setup our own conn handler.
err = s.setupInterfaces(listenAddrs)
if err != nil {
return nil, err
}
return s, nil
}
func NewBlankSwarm(ctx context.Context, id peer.ID, privkey ci.PrivKey, pstpt pst.Transport) *Swarm {
s := &Swarm{
swarm: ps.NewSwarm(pstpt),
local: id,
peers: pstore.NewPeerstore(),
ctx: ctx,
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
fdRateLimit: make(chan struct{}, concurrentFdDials),
Filters: filter.NewFilters(),
dialer: conn.NewDialer(id, privkey, nil),
}
// configure Swarm
s.limiter = newDialLimiter(s.dialAddr)
s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown)
s.SetConnHandler(nil) // make sure to setup our own conn handler.
return s
}
func (s *Swarm) AddTransport(t transport.Transport) {
s.transports = append(s.transports, t)
}
func (s *Swarm) teardown() error {
return s.swarm.Close()
}
// AddAddrFilter adds a multiaddr filter to the set of filters the swarm will
// use to determine which addresses not to dial to.
func (s *Swarm) AddAddrFilter(f string) error {
m, err := mafilter.NewMask(f)
if err != nil {
return err
}
s.Filters.AddDialFilter(m)
return nil
}
func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
if len(listenAddrs) > 0 {
filtered := addrutil.FilterUsableAddrs(listenAddrs)
if len(filtered) < 1 {
return nil, fmt.Errorf("swarm cannot use any addr in: %s", listenAddrs)
}
listenAddrs = filtered
}
return listenAddrs, nil
}
// Listen sets up listeners for all of the given addresses
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
addrs, err := filterAddrs(addrs)
if err != nil {
return err
}
return s.setupInterfaces(addrs)
}
// Process returns the Process of the swarm
func (s *Swarm) Process() goprocess.Process {
return s.proc
}
// Context returns the context of the swarm
func (s *Swarm) Context() context.Context {
return s.ctx
}
// Close stops the Swarm.
func (s *Swarm) Close() error {
return s.proc.Close()
}
// StreamSwarm returns the underlying peerstream.Swarm
func (s *Swarm) StreamSwarm() *ps.Swarm {
return s.swarm
}
// SetConnHandler assigns the handler for new connections.
// See peerstream. You will rarely use this. See SetStreamHandler
func (s *Swarm) SetConnHandler(handler ConnHandler) {
// handler is nil if user wants to clear the old handler.
if handler == nil {
s.swarm.SetConnHandler(func(psconn *ps.Conn) {
s.connHandler(psconn)
})
return
}
s.swarm.SetConnHandler(func(psconn *ps.Conn) {
// sc is nil if closed in our handler.
if sc := s.connHandler(psconn); sc != nil {
// call the user's handler. in a goroutine for sync safety.
go handler(sc)
}
})
}
// SetStreamHandler assigns the handler for new streams.
// See peerstream.
func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) {
s.swarm.SetStreamHandler(func(s *ps.Stream) {
handler(wrapStream(s))
})
}
// NewStreamWithPeer creates a new stream on any available connection to p
func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, error) {
// if we have no connections, try connecting.
if len(s.ConnectionsToPeer(p)) == 0 {
log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
if _, err := s.Dial(ctx, p); err != nil {
return nil, err
}
}
log.Debug("Swarm: NewStreamWithPeer...")
// TODO: think about passing a context down to NewStreamWithGroup
st, err := s.swarm.NewStreamWithGroup(p)
return wrapStream(st), err
}
// StreamsWithPeer returns all the live Streams to p
func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream {
return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams()))
}
// ConnectionsToPeer returns all the live connections to p
func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn {
return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns()))
}
func (s *Swarm) HaveConnsToPeer(p peer.ID) bool {
for _, c := range s.swarm.Conns() {
if c.InGroup(p) {
return true
}
}
return false
}
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []*Conn {
return wrapConns(s.swarm.Conns())
}
// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p peer.ID) error {
conns := s.swarm.ConnsWithGroup(p) // boom.
for _, c := range conns {
c.Close()
}
return nil
}
// Peers returns a copy of the set of peers swarm is connected to.
func (s *Swarm) Peers() []peer.ID {
conns := s.Connections()
seen := make(map[peer.ID]struct{})
peers := make([]peer.ID, 0, len(conns))
for _, c := range conns {
p := c.RemotePeer()
if _, found := seen[p]; found {
continue
}
seen[p] = struct{}{}
peers = append(peers, p)
}
return peers
}
// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.ID {
return s.local
}
// Backoff returns the dialbackoff object for this swarm.
func (s *Swarm) Backoff() *dialbackoff {
return &s.backf
}
// notifyAll sends a signal to all Notifiees
func (s *Swarm) notifyAll(notify func(inet.Notifiee)) {
s.notifmu.RLock()
for f := range s.notifs {
go notify(f)
}
s.notifmu.RUnlock()
}
// Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(f inet.Notifiee) {
// wrap with our notifiee, to translate function calls
n := &ps2netNotifee{net: (*Network)(s), not: f}
s.notifmu.Lock()
s.notifs[f] = n
s.notifmu.Unlock()
// register for notifications in the peer swarm.
s.swarm.Notify(n)
}
// StopNotify unregisters Notifiee fromr receiving signals
func (s *Swarm) StopNotify(f inet.Notifiee) {
s.notifmu.Lock()
n, found := s.notifs[f]
if found {
delete(s.notifs, f)
}
s.notifmu.Unlock()
if found {
s.swarm.StopNotify(n)
}
}
type ps2netNotifee struct {
net *Network
not inet.Notifiee
}
func (n *ps2netNotifee) Connected(c *ps.Conn) {
n.not.Connected(n.net, inet.Conn((*Conn)(c)))
}
func (n *ps2netNotifee) Disconnected(c *ps.Conn) {
n.not.Disconnected(n.net, inet.Conn((*Conn)(c)))
}
func (n *ps2netNotifee) OpenedStream(s *ps.Stream) {
n.not.OpenedStream(n.net, &Stream{stream: s})
}
func (n *ps2netNotifee) ClosedStream(s *ps.Stream) {
n.not.ClosedStream(n.net, &Stream{stream: s})
}
package swarm
import (
addrutil "github.com/libp2p/go-addr-util"
iconn "github.com/libp2p/go-libp2p-interface-conn"
ma "github.com/multiformats/go-multiaddr"
)
// ListenAddresses returns a list of addresses at which this swarm listens.
func (s *Swarm) ListenAddresses() []ma.Multiaddr {
listeners := s.swarm.Listeners()
addrs := make([]ma.Multiaddr, 0, len(listeners))
for _, l := range listeners {
if l2, ok := l.NetListener().(iconn.Listener); ok {
addrs = append(addrs, l2.Multiaddr())
}
}
return addrs
}
// InterfaceListenAddresses returns a list of addresses at which this swarm
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return addrutil.ResolveUnspecifiedAddresses(s.ListenAddresses(), nil)
}
package swarm
import (
"context"
"testing"
addrutil "github.com/libp2p/go-addr-util"
metrics "github.com/libp2p/go-libp2p-metrics"
pstore "github.com/libp2p/go-libp2p-peerstore"
testutil "github.com/libp2p/go-testutil"
ma "github.com/multiformats/go-multiaddr"
)
func TestFilterAddrs(t *testing.T) {
m := func(s string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return maddr
}
bad := []ma.Multiaddr{
m("/ip4/1.2.3.4/udp/1234"), // unreliable
m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet
m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm
m("/ip6/fe80::1/tcp/0"), // link local
m("/ip6/fe80::100/tcp/1234"), // link local
}
good := []ma.Multiaddr{
m("/ip4/127.0.0.1/tcp/0"),
m("/ip6/::1/tcp/0"),
m("/ip4/1.2.3.4/udp/1234/utp"),
}
goodAndBad := append(good, bad...)
// test filters
for _, a := range bad {
if addrutil.AddrUsable(a, false) {
t.Errorf("addr %s should be unusable", a)
}
}
for _, a := range good {
if !addrutil.AddrUsable(a, false) {
t.Errorf("addr %s should be usable", a)
}
}
subtestAddrsEqual(t, addrutil.FilterUsableAddrs(bad), []ma.Multiaddr{})
subtestAddrsEqual(t, addrutil.FilterUsableAddrs(good), good)
subtestAddrsEqual(t, addrutil.FilterUsableAddrs(goodAndBad), good)
// now test it with swarm
id, err := testutil.RandPeerID()
if err != nil {
t.Fatal(err)
}
ps := pstore.NewPeerstore()
ctx := context.Background()
if _, err := NewNetwork(ctx, bad, id, ps, metrics.NewBandwidthCounter()); err == nil {
t.Fatal("should have failed to create swarm")
}
if _, err := NewNetwork(ctx, goodAndBad, id, ps, metrics.NewBandwidthCounter()); err != nil {
t.Fatal("should have succeeded in creating swarm", err)
}
}
func subtestAddrsEqual(t *testing.T, a, b []ma.Multiaddr) {
if len(a) != len(b) {
t.Error(t)
}
in := func(addr ma.Multiaddr, l []ma.Multiaddr) bool {
for _, addr2 := range l {
if addr.Equal(addr2) {
return true
}
}
return false
}
for _, aa := range a {
if !in(aa, b) {
t.Errorf("%s not in %s", aa, b)
}
}
}
func TestDialBadAddrs(t *testing.T) {
m := func(s string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}
return maddr
}
ctx := context.Background()
s := makeSwarms(ctx, t, 1)[0]
test := func(a ma.Multiaddr) {
p := testutil.RandPeerIDFatal(t)
s.peers.AddAddr(p, a, pstore.PermanentAddrTTL)
if _, err := s.Dial(ctx, p); err == nil {
t.Errorf("swarm should not dial: %s", p)
}
}
test(m("/ip6/fe80::1")) // link local
test(m("/ip6/fe80::100")) // link local
test(m("/ip4/127.0.0.1/udp/1234/utp")) // utp
}
package swarm
import (
"context"
"fmt"
ps "github.com/jbenet/go-peerstream"
ic "github.com/libp2p/go-libp2p-crypto"
iconn "github.com/libp2p/go-libp2p-interface-conn"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
// Conn is a simple wrapper around a ps.Conn that also exposes
// some of the methods from the underlying conn.Conn.
// There's **five** "layers" to each connection:
// * 0. the net.Conn - underlying net.Conn (TCP/UDP/UTP/etc)
// * 1. the manet.Conn - provides multiaddr friendly Conn
// * 2. the conn.Conn - provides Peer friendly Conn (inc Secure channel)
// * 3. the peerstream.Conn - provides peerstream / spdysptream happiness
// * 4. the Conn - abstracts everyting out, exposing only key parts of underlying layers
// (I know, this is kinda crazy. it's more historical than a good design. though the
// layers do build up pieces of functionality. and they're all just io.RW :) )
type Conn ps.Conn
// ConnHandler is called when new conns are opened from remote peers.
// See peerstream.ConnHandler
type ConnHandler func(*Conn)
func (c *Conn) StreamConn() *ps.Conn {
return (*ps.Conn)(c)
}
func (c *Conn) RawConn() iconn.Conn {
// righly panic if these things aren't true. it is an expected
// invariant that these Conns are all of the typewe expect:
// ps.Conn wrapping a conn.Conn
// if we get something else it is programmer error.
return (*ps.Conn)(c).NetConn().(iconn.Conn)
}
func (c *Conn) String() string {
return fmt.Sprintf("<SwarmConn %s>", c.RawConn())
}
// LocalMultiaddr is the Multiaddr on this side
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.RawConn().LocalMultiaddr()
}
// LocalPeer is the Peer on our side of the connection
func (c *Conn) LocalPeer() peer.ID {
return c.RawConn().LocalPeer()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
return c.RawConn().RemoteMultiaddr()
}
// RemotePeer is the Peer on the remote side
func (c *Conn) RemotePeer() peer.ID {
return c.RawConn().RemotePeer()
}
// LocalPrivateKey is the public key of the peer on this side
func (c *Conn) LocalPrivateKey() ic.PrivKey {
return c.RawConn().LocalPrivateKey()
}
// RemotePublicKey is the public key of the peer on the remote side
func (c *Conn) RemotePublicKey() ic.PubKey {
return c.RawConn().RemotePublicKey()
}
// NewSwarmStream returns a new Stream from this connection
func (c *Conn) NewSwarmStream() (*Stream, error) {
s, err := c.StreamConn().NewStream()
return wrapStream(s), err
}
// NewStream returns a new Stream from this connection
func (c *Conn) NewStream() (inet.Stream, error) {
s, err := c.NewSwarmStream()
return inet.Stream(s), err
}
// Close closes the underlying stream connection
func (c *Conn) Close() error {
return c.StreamConn().Close()
}
func wrapConn(psc *ps.Conn) (*Conn, error) {
// grab the underlying connection.
if _, ok := psc.NetConn().(iconn.Conn); !ok {
// this should never happen. if we see it ocurring it means that we added
// a Listener to the ps.Swarm that is NOT one of our net/conn.Listener.
return nil, fmt.Errorf("swarm connHandler: invalid conn (not a conn.Conn): %s", psc)
}
return (*Conn)(psc), nil
}
// wrapConns returns a *Conn for all these ps.Conns
func wrapConns(conns1 []*ps.Conn) []*Conn {
conns2 := make([]*Conn, len(conns1))
for i, c1 := range conns1 {
if c2, err := wrapConn(c1); err == nil {
conns2[i] = c2
}
}
return conns2
}
// newConnSetup does the swarm's "setup" for a connection. returns the underlying
// conn.Conn this method is used by both swarm.Dial and ps.Swarm connHandler
func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error) {
// wrap with a Conn
sc, err := wrapConn(psConn)
if err != nil {
return nil, err
}
// if we have a public key, make sure we add it to our peerstore!
// This is an important detail. Otherwise we must fetch the public
// key from the DHT or some other system.
if pk := sc.RemotePublicKey(); pk != nil {
s.peers.AddPubKey(sc.RemotePeer(), pk)
}
// ok great! we can use it. add it to our group.
// set the RemotePeer as a group on the conn. this lets us group
// connections in the StreamSwarm by peer, and get a streams from
// any available connection in the group (better multiconn):
// swarm.StreamSwarm().NewStreamWithGroup(remotePeer)
psConn.AddGroup(sc.RemotePeer())
return sc, nil
}
package swarm
import (
"context"
"errors"
"fmt"
"sync"
"time"
addrutil "github.com/libp2p/go-addr-util"
iconn "github.com/libp2p/go-libp2p-interface-conn"
lgbl "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
// Diagram of dial sync:
//
// many callers of Dial() synched w. dials many addrs results to callers
// ----------------------\ dialsync use earliest /--------------
// -----------------------\ |----------\ /----------------
// ------------------------>------------<------- >---------<-----------------
// -----------------------| \----x \----------------
// ----------------------| \-----x \---------------
// any may fail if no addr at end
// retry dialAttempt x
var (
// ErrDialBackoff is returned by the backoff code when a given peer has
// been dialed too frequently
ErrDialBackoff = errors.New("dial backoff")
// ErrDialFailed is returned when connecting to a peer has ultimately failed
ErrDialFailed = errors.New("dial attempt failed")
// ErrDialToSelf is returned if we attempt to dial our own peer
ErrDialToSelf = errors.New("dial to self attempted")
)
// dialAttempts governs how many times a goroutine will try to dial a given peer.
// Note: this is down to one, as we have _too many dials_ atm. To add back in,
// add loop back in Dial(.)
const dialAttempts = 1
// number of concurrent outbound dials over transports that consume file descriptors
const concurrentFdDials = 160
// number of concurrent outbound dials to make per peer
const defaultPerPeerRateLimit = 8
// DialTimeout is the amount of time each dial attempt has. We can think about making
// this larger down the road, or putting more granular timeouts (i.e. within each
// subcomponent of Dial)
var DialTimeout = time.Second * 10
// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
// for {
// if ok, wait := dialsync.Lock(p); !ok {
// if backoff.Backoff(p) {
// return errDialFailed
// }
// <-wait
// continue
// }
// defer dialsync.Unlock(p)
// c, err := actuallyDial(p)
// if err != nil {
// dialbackoff.AddBackoff(p)
// continue
// }
// dialbackoff.Clear(p)
// }
//
type dialbackoff struct {
entries map[peer.ID]*backoffPeer
lock sync.RWMutex
}
type backoffPeer struct {
tries int
until time.Time
}
func (db *dialbackoff) init() {
if db.entries == nil {
db.entries = make(map[peer.ID]*backoffPeer)
}
}
// Backoff returns whether the client should backoff from dialing
// peer p
func (db *dialbackoff) Backoff(p peer.ID) (backoff bool) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
bp, found := db.entries[p]
if found && time.Now().Before(bp.until) {
return true
}
return false
}
const baseBackoffTime = time.Second * 5
const maxBackoffTime = time.Minute * 5
// AddBackoff lets other nodes know that we've entered backoff with
// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
func (db *dialbackoff) AddBackoff(p peer.ID) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
bp, ok := db.entries[p]
if !ok {
db.entries[p] = &backoffPeer{
tries: 1,
until: time.Now().Add(baseBackoffTime),
}
return
}
expTimeAdd := time.Second * time.Duration(bp.tries*bp.tries)
if expTimeAdd > maxBackoffTime {
expTimeAdd = maxBackoffTime
}
bp.until = time.Now().Add(baseBackoffTime + expTimeAdd)
bp.tries++
}
// Clear removes a backoff record. Clients should call this after a
// successful Dial.
func (db *dialbackoff) Clear(p peer.ID) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
delete(db.entries, p)
}
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local {
log.Event(ctx, "swarmDialSelf", logdial)
return nil, ErrDialToSelf
}
return s.gatedDialAttempt(ctx, p)
}
func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn {
cs := s.ConnectionsToPeer(p)
for _, conn := range cs {
if conn != nil { // dump out the first one we find. (TODO pick better)
return conn
}
}
return nil
}
// gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's
// dial synchronization systems: dialsync and dialbackoff.
func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) {
defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()
// check if we already have an open connection first
conn := s.bestConnectionToPeer(p)
if conn != nil {
return conn, nil
}
// if this peer has been backed off, lets get out of here
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", p)
return nil, ErrDialBackoff
}
return s.dsync.DialLock(ctx, p)
}
// doDial is an ugly shim method to retain all the logging and backoff logic
// of the old dialsync code
func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
// ok, we have been charged to dial! let's do it.
// if it succeeds, dial will add the conn to the swarm itself.
defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done()
ctxT, cancel := context.WithTimeout(ctx, s.dialT)
conn, err := s.dial(ctxT, p)
cancel()
log.Debugf("dial end %s", conn)
if err != nil {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff
// ok, we failed. try again. (if loop is done, our error is output)
return nil, fmt.Errorf("dial attempt failed: %s", err)
}
log.Event(ctx, "swarmDialBackoffClear", logdial)
s.backf.Clear(p) // okay, no longer need to backoff
return conn, nil
}
// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local {
log.Event(ctx, "swarmDialDoDialSelf", logdial)
return nil, ErrDialToSelf
}
defer log.EventBegin(ctx, "swarmDialDo", logdial).Done()
logdial["dial"] = "failure" // start off with failure. set to "success" at the end.
sk := s.peers.PrivKey(s.local)
logdial["encrypted"] = (sk != nil) // log wether this will be an encrypted dial or not.
if sk == nil {
// fine for sk to be nil, just log.
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
}
ila, _ := s.InterfaceListenAddresses()
subtractFilter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...)
// get live channel of addresses for peer, filtered by the given filters
/*
remoteAddrChan := s.peers.AddrsChan(ctx, p,
addrutil.AddrUsableFilter,
subtractFilter,
s.Filters.AddrBlocked)
*/
//////
/*
This code is temporary, the peerstore can currently provide
a channel as an interface for receiving addresses, but more thought
needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
paddrs := s.peers.Addrs(p)
goodAddrs := addrutil.FilterAddrs(paddrs,
addrutil.AddrUsableFunc,
subtractFilter,
addrutil.FilterNeg(s.Filters.AddrBlocked),
)
remoteAddrChan := make(chan ma.Multiaddr, len(goodAddrs))
for _, a := range goodAddrs {
remoteAddrChan <- a
}
close(remoteAddrChan)
/////////
// try to get a connection to any addr
connC, err := s.dialAddrs(ctx, p, remoteAddrChan)
if err != nil {
logdial["error"] = err
return nil, err
}
logdial["netconn"] = lgbl.NetConn(connC)
// ok try to setup the new connection.
defer log.EventBegin(ctx, "swarmDialDoSetup", logdial, lgbl.NetConn(connC)).Done()
swarmC, err := dialConnSetup(ctx, s, connC)
if err != nil {
logdial["error"] = err
connC.Close() // close the connection. didn't work out :(
return nil, err
}
logdial["dial"] = "success"
return swarmC, nil
}
func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (iconn.Conn, error) {
log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs)
ctx, cancel := context.WithCancel(ctx)
defer cancel() // cancel work when we exit func
// use a single response type instead of errs and conns, reduces complexity *a ton*
respch := make(chan dialResult)
defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p)
exitErr := defaultDialFail
var active int
for {
select {
case addr, ok := <-remoteAddrs:
if !ok {
remoteAddrs = nil
if active == 0 {
return nil, exitErr
}
continue
}
s.limitedDial(ctx, p, addr, respch)
active++
case <-ctx.Done():
if exitErr == defaultDialFail {
exitErr = ctx.Err()
}
return nil, exitErr
case resp := <-respch:
active--
if resp.Err != nil {
log.Info("got error on dial: ", resp.Err)
// Errors are normal, lots of dials will fail
exitErr = resp.Err
if remoteAddrs == nil && active == 0 {
return nil, exitErr
}
} else if resp.Conn != nil {
return resp.Conn, nil
}
}
}
}
// limitedDial will start a dial to the given peer when
// it is able, respecting the various different types of rate
// limiting that occur without using extra goroutines per addr
func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) {
s.limiter.AddDialJob(&dialJob{
addr: a,
peer: p,
resp: resp,
ctx: ctx,
})
}
func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (iconn.Conn, error) {
log.Debugf("%s swarm dialing %s %s", s.local, p, addr)
connC, err := s.dialer.Dial(ctx, addr, p)
if err != nil {
return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err)
}
// if the connection is not to whom we thought it would be...
remotep := connC.RemotePeer()
if remotep != p {
connC.Close()
_, err := connC.Read(nil) // should return any potential errors (ex: from secio)
return nil, fmt.Errorf("misdial to %s through %s (got %s): %s", p, addr, remotep, err)
}
// if the connection is to ourselves...
// this can happen TONS when Loopback addrs are advertized.
// (this should be caught by two checks above, but let's just make sure.)
if remotep == s.local {
connC.Close()
return nil, fmt.Errorf("misdial to %s through %s (got self)", p, addr)
}
// success! we got one!
return connC, nil
}
var ConnSetupTimeout = time.Minute * 5
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC iconn.Conn) (*Conn, error) {
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(ConnSetupTimeout)
}
if err := connC.SetDeadline(deadline); err != nil {
return nil, err
}
psC, err := s.swarm.AddConn(connC)
if err != nil {
// connC is closed by caller if we fail.
return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
}
// ok try to setup the new connection. (newConnSetup will add to group)
swarmC, err := s.newConnSetup(ctx, psC)
if err != nil {
psC.Close() // we need to make sure psC is Closed.
return nil, err
}
if err := connC.SetDeadline(time.Time{}); err != nil {
log.Error("failed to reset connection deadline after setup: ", err)
return nil, err
}
return swarmC, err
}
package swarm
import (
"context"
"fmt"
ps "github.com/jbenet/go-peerstream"
conn "github.com/libp2p/go-libp2p-conn"
iconn "github.com/libp2p/go-libp2p-interface-conn"
lgbl "github.com/libp2p/go-libp2p-loggables"
mconn "github.com/libp2p/go-libp2p-metrics/conn"
inet "github.com/libp2p/go-libp2p-net"
transport "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
)
func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
tpt := s.transportForAddr(a)
if tpt == nil {
return fmt.Errorf("no transport for address: %s", a)
}
d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts)
if err != nil {
return err
}
s.dialer.AddDialer(d)
list, err := tpt.Listen(a)
if err != nil {
return err
}
err = s.addListener(list)
if err != nil {
return err
}
return nil
}
// Open listeners and reuse-dialers for the given addresses
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
errs := make([]error, len(addrs))
var succeeded int
for i, a := range addrs {
if err := s.AddListenAddr(a); err != nil {
errs[i] = err
} else {
succeeded++
}
}
for i, e := range errs {
if e != nil {
log.Warning("listen on %s failed: %s", addrs[i], errs[i])
}
}
if succeeded == 0 && len(addrs) > 0 {
return fmt.Errorf("failed to listen on any addresses: %s", errs)
}
return nil
}
func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport {
for _, t := range s.transports {
if t.Matches(a) {
return t
}
}
return nil
}
func (s *Swarm) addListener(tptlist transport.Listener) error {
sk := s.peers.PrivKey(s.local)
if sk == nil {
// may be fine for sk to be nil, just log a warning.
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk)
if err != nil {
return err
}
list.SetAddrFilters(s.Filters)
if cw, ok := list.(conn.ListenerConnWrapper); ok && s.bwc != nil {
cw.SetConnWrapper(func(c transport.Conn) transport.Conn {
return mconn.WrapConn(s.bwc, c)
})
}
return s.addConnListener(list)
}
func (s *Swarm) addConnListener(list iconn.Listener) error {
// AddListener to the peerstream Listener. this will begin accepting connections
// and streams!
sl, err := s.swarm.AddListener(list)
if err != nil {
return err
}
log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
maddr := list.Multiaddr()
// signal to our notifiees on successful conn.
s.notifyAll(func(n inet.Notifiee) {
n.Listen((*Network)(s), maddr)
})
// go consume peerstream's listen accept errors. note, these ARE errors.
// they may be killing the listener, and if we get _any_ we should be
// fixing this in our conn.Listener (to ignore them or handle them
// differently.)
go func(ctx context.Context, sl *ps.Listener) {
// signal to our notifiees closing
defer s.notifyAll(func(n inet.Notifiee) {
n.ListenClose((*Network)(s), maddr)
})
for {
select {
case err, more := <-sl.AcceptErrors():
if !more {
return
}
log.Warningf("swarm listener accept error: %s", err)
case <-ctx.Done():
return
}
}
}(s.Context(), sl)
return nil
}
// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
ctx := context.Background()
// this context is for running the handshake, which -- when receiveing connections
// -- we have no bound on beyond what the transport protocol bounds it at.
// note that setup + the handshake are bounded by underlying io.
// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)
sc, err := s.newConnSetup(ctx, c)
if err != nil {
log.Debug(err)
log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
c.Close() // boom. close it.
return nil
}
// if a peer dials us, remove from dial backoff.
s.backf.Clear(sc.RemotePeer())
return sc
}
package swarm
import (
"context"
"fmt"
"github.com/jbenet/goprocess"
metrics "github.com/libp2p/go-libp2p-metrics"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
)
// Network implements the inet.Network interface.
// It is simply a swarm, with a few different functions
// to implement inet.Network.
type Network Swarm
// NewNetwork constructs a new network and starts listening on given addresses.
func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID,
peers pstore.Peerstore, bwc metrics.Reporter) (*Network, error) {
s, err := NewSwarm(ctx, listen, local, peers, bwc)
if err != nil {
return nil, err
}
return (*Network)(s), nil
}
// DialPeer attempts to establish a connection to a given peer.
// Respects the context.
func (n *Network) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
log.Debugf("[%s] network dialing peer [%s]", n.local, p)
sc, err := n.Swarm().Dial(ctx, p)
if err != nil {
return nil, err
}
log.Debugf("network for %s finished dialing %s", n.local, p)
return inet.Conn(sc), nil
}
// Process returns the network's Process
func (n *Network) Process() goprocess.Process {
return n.proc
}
// Swarm returns the network's peerstream.Swarm
func (n *Network) Swarm() *Swarm {
return (*Swarm)(n)
}
// LocalPeer the network's LocalPeer
func (n *Network) LocalPeer() peer.ID {
return n.Swarm().LocalPeer()
}
// Peers returns the known peer IDs from the Peerstore
func (n *Network) Peers() []peer.ID {
return n.Swarm().Peers()
}
// Peerstore returns the Peerstore, which tracks known peers
func (n *Network) Peerstore() pstore.Peerstore {
return n.Swarm().peers
}
// Conns returns the connected peers
func (n *Network) Conns() []inet.Conn {
conns1 := n.Swarm().Connections()
out := make([]inet.Conn, len(conns1))
for i, c := range conns1 {
out[i] = inet.Conn(c)
}
return out
}
// ConnsToPeer returns the connections in this Netowrk for given peer.
func (n *Network) ConnsToPeer(p peer.ID) []inet.Conn {
conns1 := n.Swarm().ConnectionsToPeer(p)
out := make([]inet.Conn, len(conns1))
for i, c := range conns1 {
out[i] = inet.Conn(c)
}
return out
}
// ClosePeer connection to peer
func (n *Network) ClosePeer(p peer.ID) error {
return n.Swarm().CloseConnection(p)
}
// close is the real teardown function
func (n *Network) close() error {
return n.Swarm().Close()
}
// Close calls the ContextCloser func
func (n *Network) Close() error {
return n.Swarm().proc.Close()
}
// Listen tells the network to start listening on given multiaddrs.
func (n *Network) Listen(addrs ...ma.Multiaddr) error {
return n.Swarm().Listen(addrs...)
}
// ListenAddresses returns a list of addresses at which this network listens.
func (n *Network) ListenAddresses() []ma.Multiaddr {
return n.Swarm().ListenAddresses()
}
// InterfaceListenAddresses returns a list of addresses at which this network
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return n.Swarm().InterfaceListenAddresses()
}
// Connectedness returns a state signaling connection capabilities
// For now only returns Connected || NotConnected. Expand into more later.
func (n *Network) Connectedness(p peer.ID) inet.Connectedness {
if n.Swarm().HaveConnsToPeer(p) {
return inet.Connected
}
return inet.NotConnected
}
// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
func (n *Network) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
log.Debugf("[%s] network opening stream to peer [%s]", n.local, p)
s, err := n.Swarm().NewStreamWithPeer(ctx, p)
if err != nil {
return nil, err
}
return inet.Stream(s), nil
}
// SetStreamHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (n *Network) SetStreamHandler(h inet.StreamHandler) {
n.Swarm().SetStreamHandler(h)
}
// SetConnHandler sets the conn handler on the Network.
// This operation is threadsafe.
func (n *Network) SetConnHandler(h inet.ConnHandler) {
n.Swarm().SetConnHandler(func(c *Conn) {
h(inet.Conn(c))
})
}
// String returns a string representation of Network.
func (n *Network) String() string {
return fmt.Sprintf("<Network %s>", n.LocalPeer())
}
// Notify signs up Notifiee to receive signals when events happen
func (n *Network) Notify(f inet.Notifiee) {
n.Swarm().Notify(f)
}
// StopNotify unregisters Notifiee fromr receiving signals
func (n *Network) StopNotify(f inet.Notifiee) {
n.Swarm().StopNotify(f)
}
package swarm_test
import (
"fmt"
"testing"
"time"
"context"
inet "github.com/libp2p/go-libp2p-net"
testutil "github.com/libp2p/go-libp2p/p2p/test/util"
)
// TestConnectednessCorrect starts a few networks, connects a few
// and tests Connectedness value is correct.
func TestConnectednessCorrect(t *testing.T) {
ctx := context.Background()
nets := make([]inet.Network, 4)
for i := 0; i < 4; i++ {
nets[i] = testutil.GenSwarmNetwork(t, ctx)
}
// connect 0-1, 0-2, 0-3, 1-2, 2-3
dial := func(a, b inet.Network) {
testutil.DivulgeAddresses(b, a)
if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
t.Fatalf("Failed to dial: %s", err)
}
}
dial(nets[0], nets[1])
dial(nets[0], nets[3])
dial(nets[1], nets[2])
dial(nets[3], nets[2])
// The notifications for new connections get sent out asynchronously.
// There is the potential for a race condition here, so we sleep to ensure
// that they have been received.
time.Sleep(time.Millisecond * 100)
// test those connected show up correctly
// test connected
expectConnectedness(t, nets[0], nets[1], inet.Connected)
expectConnectedness(t, nets[0], nets[3], inet.Connected)
expectConnectedness(t, nets[1], nets[2], inet.Connected)
expectConnectedness(t, nets[3], nets[2], inet.Connected)
// test not connected
expectConnectedness(t, nets[0], nets[2], inet.NotConnected)
expectConnectedness(t, nets[1], nets[3], inet.NotConnected)
if len(nets[0].Peers()) != 2 {
t.Fatal("expected net 0 to have two peers")
}
if len(nets[2].Conns()) != 2 {
t.Fatal("expected net 2 to have two conns")
}
if len(nets[1].ConnsToPeer(nets[3].LocalPeer())) != 0 {
t.Fatal("net 1 should have no connections to net 3")
}
if err := nets[2].ClosePeer(nets[1].LocalPeer()); err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 50)
expectConnectedness(t, nets[2], nets[1], inet.NotConnected)
for _, n := range nets {
n.Close()
}
for _, n := range nets {
<-n.Process().Closed()
}
}
func expectConnectedness(t *testing.T, a, b inet.Network, expected inet.Connectedness) {
es := "%s is connected to %s, but Connectedness incorrect. %s %s %s"
atob := a.Connectedness(b.LocalPeer())
btoa := b.Connectedness(a.LocalPeer())
if atob != expected {
t.Errorf(es, a, b, printConns(a), printConns(b), atob)
}
// test symmetric case
if btoa != expected {
t.Errorf(es, b, a, printConns(b), printConns(a), btoa)
}
}
func printConns(n inet.Network) string {
s := fmt.Sprintf("Connections in %s:\n", n)
for _, c := range n.Conns() {
s = s + fmt.Sprintf("- %s\n", c)
}
return s
}
func TestNetworkOpenStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nets := make([]inet.Network, 4)
for i := 0; i < 4; i++ {
nets[i] = testutil.GenSwarmNetwork(t, ctx)
}
dial := func(a, b inet.Network) {
testutil.DivulgeAddresses(b, a)
if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
t.Fatalf("Failed to dial: %s", err)
}
}
dial(nets[0], nets[1])
dial(nets[0], nets[3])
dial(nets[1], nets[2])
done := make(chan bool)
nets[1].SetStreamHandler(func(s inet.Stream) {
defer close(done)
defer s.Close()
buf := make([]byte, 10)
_, err := s.Read(buf)
if err != nil {
t.Error(err)
return
}
if string(buf) != "hello ipfs" {
t.Error("got wrong message")
}
})
s, err := nets[0].NewStream(ctx, nets[1].LocalPeer())
if err != nil {
t.Fatal(err)
}
_, err = s.Write([]byte("hello ipfs"))
if err != nil {
t.Fatal(err)
}
err = s.Close()
if err != nil {
t.Fatal(err)
}
select {
case <-done:
case <-time.After(time.Millisecond * 100):
t.Fatal("timed out waiting on stream")
}
_, err = nets[1].NewStream(ctx, nets[3].LocalPeer())
if err == nil {
t.Fatal("expected stream open 1->3 to fail")
}
}
package swarm
import (
"testing"
"time"
"context"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
func streamsSame(a, b inet.Stream) bool {
sa := a.(*Stream)
sb := b.(*Stream)
return sa.Stream() == sb.Stream()
}
func TestNotifications(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 5)
defer func() {
for _, s := range swarms {
s.Close()
}
}()
timeout := 5 * time.Second
// signup notifs
notifiees := make([]*netNotifiee, len(swarms))
for i, swarm := range swarms {
n := newNetNotifiee()
swarm.Notify(n)
notifiees[i] = n
}
connectSwarms(t, ctx, swarms)
<-time.After(time.Millisecond)
// should've gotten 5 by now.
// test everyone got the correct connection opened calls
for i, s := range swarms {
n := notifiees[i]
notifs := make(map[peer.ID][]inet.Conn)
for j, s2 := range swarms {
if i == j {
continue
}
// this feels a little sketchy, but its probably okay
for len(s.ConnectionsToPeer(s2.LocalPeer())) != len(notifs[s2.LocalPeer()]) {
select {
case c := <-n.connected:
nfp := notifs[c.RemotePeer()]
notifs[c.RemotePeer()] = append(nfp, c)
case <-time.After(timeout):
t.Fatal("timeout")
}
}
}
for p, cons := range notifs {
expect := s.ConnectionsToPeer(p)
if len(expect) != len(cons) {
t.Fatal("got different number of connections")
}
for _, c := range cons {
var found bool
for _, c2 := range expect {
if c == c2 {
found = true
break
}
}
if !found {
t.Fatal("connection not found!")
}
}
}
}
complement := func(c inet.Conn) (*Swarm, *netNotifiee, *Conn) {
for i, s := range swarms {
for _, c2 := range s.Connections() {
if c.LocalMultiaddr().Equal(c2.RemoteMultiaddr()) &&
c2.LocalMultiaddr().Equal(c.RemoteMultiaddr()) {
return s, notifiees[i], c2
}
}
}
t.Fatal("complementary conn not found", c)
return nil, nil, nil
}
testOCStream := func(n *netNotifiee, s inet.Stream) {
var s2 inet.Stream
select {
case s2 = <-n.openedStream:
t.Log("got notif for opened stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if !streamsSame(s, s2) {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
}
select {
case s2 = <-n.closedStream:
t.Log("got notif for closed stream")
case <-time.After(timeout):
t.Fatal("timeout")
}
if !streamsSame(s, s2) {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
}
}
streams := make(chan inet.Stream)
for _, s := range swarms {
s.SetStreamHandler(func(s inet.Stream) {
streams <- s
s.Close()
})
}
// open a streams in each conn
for i, s := range swarms {
for _, c := range s.Connections() {
_, n2, _ := complement(c)
st1, err := c.NewStream()
if err != nil {
t.Error(err)
} else {
st1.Write([]byte("hello"))
st1.Close()
testOCStream(notifiees[i], st1)
st2 := <-streams
testOCStream(n2, st2)
}
}
}
// close conns
for i, s := range swarms {
n := notifiees[i]
for _, c := range s.Connections() {
_, n2, c2 := complement(c)
c.Close()
c2.Close()
var c3, c4 inet.Conn
select {
case c3 = <-n.disconnected:
case <-time.After(timeout):
t.Fatal("timeout")
}
if c != c3 {
t.Fatal("got incorrect conn", c, c3)
}
select {
case c4 = <-n2.disconnected:
case <-time.After(timeout):
t.Fatal("timeout")
}
if c2 != c4 {
t.Fatal("got incorrect conn", c, c2)
}
}
}
}
type netNotifiee struct {
listen chan ma.Multiaddr
listenClose chan ma.Multiaddr
connected chan inet.Conn
disconnected chan inet.Conn
openedStream chan inet.Stream
closedStream chan inet.Stream
}
func newNetNotifiee() *netNotifiee {
return &netNotifiee{
listen: make(chan ma.Multiaddr),
listenClose: make(chan ma.Multiaddr),
connected: make(chan inet.Conn),
disconnected: make(chan inet.Conn),
openedStream: make(chan inet.Stream),
closedStream: make(chan inet.Stream),
}
}
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {
nn.listen <- a
}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {
nn.listenClose <- a
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.connected <- v
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
nn.disconnected <- v
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {
nn.openedStream <- v
}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {
nn.closedStream <- v
}
package swarm
import (
inet "github.com/libp2p/go-libp2p-net"
protocol "github.com/libp2p/go-libp2p-protocol"
ps "github.com/jbenet/go-peerstream"
)
// Stream is a wrapper around a ps.Stream that exposes a way to get
// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm)
type Stream struct {
stream *ps.Stream
protocol protocol.ID
}
// Stream returns the underlying peerstream.Stream
func (s *Stream) Stream() *ps.Stream {
return s.stream
}
// Conn returns the Conn associated with this Stream, as an inet.Conn
func (s *Stream) Conn() inet.Conn {
return s.SwarmConn()
}
// SwarmConn returns the Conn associated with this Stream, as a *Conn
func (s *Stream) SwarmConn() *Conn {
return (*Conn)(s.stream.Conn())
}
// Read reads bytes from a stream.
func (s *Stream) Read(p []byte) (n int, err error) {
return s.stream.Read(p)
}
// Write writes bytes to a stream, flushing for each call.
func (s *Stream) Write(p []byte) (n int, err error) {
return s.stream.Write(p)
}
// Close closes the stream, indicating this side is finished
// with the stream.
func (s *Stream) Close() error {
return s.stream.Close()
}
func (s *Stream) Protocol() protocol.ID {
return s.protocol
}
func (s *Stream) SetProtocol(p protocol.ID) {
s.protocol = p
}
func wrapStream(pss *ps.Stream) *Stream {
return &Stream{
stream: pss,
}
}
func wrapStreams(st []*ps.Stream) []*Stream {
out := make([]*Stream, len(st))
for i, s := range st {
out[i] = wrapStream(s)
}
return out
}
package swarm
import (
"bytes"
"context"
"fmt"
"io"
"net"
"sync"
"testing"
"time"
metrics "github.com/libp2p/go-libp2p-metrics"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
testutil "github.com/libp2p/go-testutil"
ma "github.com/multiformats/go-multiaddr"
)
func EchoStreamHandler(stream inet.Stream) {
go func() {
defer stream.Close()
// pull out the ipfs conn
c := stream.Conn()
log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
buf := make([]byte, 4)
for {
if _, err := stream.Read(buf); err != nil {
if err != io.EOF {
log.Error("ping receive error:", err)
}
return
}
if !bytes.Equal(buf, []byte("ping")) {
log.Errorf("ping receive error: ping != %s %v", buf, buf)
return
}
if _, err := stream.Write([]byte("pong")); err != nil {
log.Error("pond send error:", err)
return
}
}
}()
}
func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm {
id := testutil.RandIdentityOrFatal(t)
peerstore := pstore.NewPeerstore()
peerstore.AddPubKey(id.ID(), id.PublicKey())
peerstore.AddPrivKey(id.ID(), id.PrivateKey())
swarm, err := NewSwarm(ctx, nil, id.ID(), peerstore, metrics.NewBandwidthCounter())
if err != nil {
t.Fatal(err)
}
swarm.SetStreamHandler(EchoStreamHandler)
return swarm
}
func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm {
swarms := make([]*Swarm, 0, num)
for i := 0; i < num; i++ {
localnp := testutil.RandPeerNetParamsOrFatal(t)
peerstore := pstore.NewPeerstore()
peerstore.AddPubKey(localnp.ID, localnp.PubKey)
peerstore.AddPrivKey(localnp.ID, localnp.PrivKey)
addrs := []ma.Multiaddr{localnp.Addr}
swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore, metrics.NewBandwidthCounter())
if err != nil {
t.Fatal(err)
}
swarm.SetStreamHandler(EchoStreamHandler)
swarms = append(swarms, swarm)
}
return swarms
}
func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm) {
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.peers.AddAddr(dst, addr, pstore.PermanentAddrTTL)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for _, s1 := range swarms {
for _, s2 := range swarms {
if s2.local != s1.local { // don't connect to self.
wg.Add(1)
connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0]) // try the first.
}
}
}
wg.Wait()
for _, s := range swarms {
log.Infof("%s swarm routing table: %s", s.local, s.Peers())
}
}
func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms := makeSwarms(ctx, t, SwarmNum)
// connect everyone
connectSwarms(t, ctx, swarms)
// ping/pong
for _, s1 := range swarms {
log.Debugf("-------------------------------------------------------")
log.Debugf("%s ping pong round", s1.local)
log.Debugf("-------------------------------------------------------")
_, cancel := context.WithCancel(ctx)
got := map[peer.ID]int{}
errChan := make(chan error, MsgNum*len(swarms))
streamChan := make(chan *Stream, MsgNum)
// send out "ping" x MsgNum to every peer
go func() {
defer close(streamChan)
var wg sync.WaitGroup
send := func(p peer.ID) {
defer wg.Done()
// first, one stream per peer (nice)
stream, err := s1.NewStreamWithPeer(ctx, p)
if err != nil {
errChan <- err
return
}
// send out ping!
for k := 0; k < MsgNum; k++ { // with k messages
msg := "ping"
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
if _, err := stream.Write([]byte(msg)); err != nil {
errChan <- err
continue
}
}
// read it later
streamChan <- stream
}
for _, s2 := range swarms {
if s2.local == s1.local {
continue // dont send to self...
}
wg.Add(1)
go send(s2.local)
}
wg.Wait()
}()
// receive "pong" x MsgNum from every peer
go func() {
defer close(errChan)
count := 0
countShouldBe := MsgNum * (len(swarms) - 1)
for stream := range streamChan { // one per peer
defer stream.Close()
// get peer on the other side
p := stream.Conn().RemotePeer()
// receive pings
msgCount := 0
msg := make([]byte, 4)
for k := 0; k < MsgNum; k++ { // with k messages
// read from the stream
if _, err := stream.Read(msg); err != nil {
errChan <- err
continue
}
if string(msg) != "pong" {
errChan <- fmt.Errorf("unexpected message: %s", msg)
continue
}
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
msgCount++
}
got[p] = msgCount
count += msgCount
}
if count != countShouldBe {
errChan <- fmt.Errorf("count mismatch: %d != %d", count, countShouldBe)
}
}()
// check any errors (blocks till consumer is done)
for err := range errChan {
if err != nil {
t.Error(err.Error())
}
}
log.Debugf("%s got pongs", s1.local)
if (len(swarms) - 1) != len(got) {
t.Errorf("got (%d) less messages than sent (%d).", len(got), len(swarms))
}
for p, n := range got {
if n != MsgNum {
t.Error("peer did not get all msgs", p, n, "/", MsgNum)
}
}
cancel()
<-time.After(10 * time.Millisecond)
}
for _, s := range swarms {
s.Close()
}
}
func TestSwarm(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
// msgs := 1000
msgs := 100
swarms := 5
SubtestSwarm(t, swarms, msgs)
}
func TestBasicSwarm(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
msgs := 1
swarms := 2
SubtestSwarm(t, swarms, msgs)
}
func TestConnHandler(t *testing.T) {
// t.Skip("skipping for another test")
t.Parallel()
ctx := context.Background()
swarms := makeSwarms(ctx, t, 5)
gotconn := make(chan struct{}, 10)
swarms[0].SetConnHandler(func(conn *Conn) {
gotconn <- struct{}{}
})
connectSwarms(t, ctx, swarms)
<-time.After(time.Millisecond)
// should've gotten 5 by now.
swarms[0].SetConnHandler(nil)
expect := 4
for i := 0; i < expect; i++ {
select {
case <-time.After(time.Second):
t.Fatal("failed to get connections")
case <-gotconn:
}
}
select {
case <-gotconn:
t.Fatalf("should have connected to %d swarms, got an extra.", expect)
default:
}
}
func TestAddrBlocking(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
swarms[0].SetConnHandler(func(conn *Conn) {
t.Errorf("no connections should happen! -- %s", conn)
})
_, block, err := net.ParseCIDR("127.0.0.1/8")
if err != nil {
t.Fatal(err)
}
swarms[1].Filters.AddDialFilter(block)
swarms[1].peers.AddAddr(swarms[0].LocalPeer(), swarms[0].ListenAddresses()[0], pstore.PermanentAddrTTL)
_, err = swarms[1].Dial(ctx, swarms[0].LocalPeer())
if err == nil {
t.Fatal("dial should have failed")
}
swarms[0].peers.AddAddr(swarms[1].LocalPeer(), swarms[1].ListenAddresses()[0], pstore.PermanentAddrTTL)
_, err = swarms[0].Dial(ctx, swarms[1].LocalPeer())
if err == nil {
t.Fatal("dial should have failed")
}
}
func TestFilterBounds(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
conns := make(chan struct{}, 8)
swarms[0].SetConnHandler(func(conn *Conn) {
conns <- struct{}{}
})
// Address that we wont be dialing from
_, block, err := net.ParseCIDR("192.0.0.1/8")
if err != nil {
t.Fatal(err)
}
// set filter on both sides, shouldnt matter
swarms[1].Filters.AddDialFilter(block)
swarms[0].Filters.AddDialFilter(block)
connectSwarms(t, ctx, swarms)
select {
case <-time.After(time.Second):
t.Fatal("should have gotten connection")
case <-conns:
t.Log("got connect")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment