diff --git a/examples/hosts/main.go b/examples/hosts/main.go index bfcc3cebe0e66c4b5ac5c6eaed14e367fcc90dca..ed66256fa441340feb8f9f62b5dcc30adff21412 100644 --- a/examples/hosts/main.go +++ b/examples/hosts/main.go @@ -8,14 +8,14 @@ import ( "log" "strings" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + host "github.com/libp2p/go-libp2p-host" inet "github.com/libp2p/go-libp2p-net" net "github.com/libp2p/go-libp2p-net" - bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - swarm "github.com/libp2p/go-libp2p/p2p/net/swarm" - peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" + swarm "github.com/libp2p/go-libp2p-swarm" testutil "github.com/libp2p/go-testutil" ma "github.com/multiformats/go-multiaddr" ) diff --git a/examples/justtcp/main.go b/examples/justtcp/main.go index ac56e2360484fa55fb8d5dccfee130e0406f0de0..e9f53b00f93df508836da8f16f666a787d8320df 100644 --- a/examples/justtcp/main.go +++ b/examples/justtcp/main.go @@ -5,7 +5,7 @@ import ( "fmt" "os" - "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/libp2p/go-libp2p-swarm" tcpt "github.com/libp2p/go-tcp-transport" ma "github.com/multiformats/go-multiaddr" ) diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go deleted file mode 100644 index 75d3f0fbee1b545c3a538f602b6113d21d083104..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/dial_sync.go +++ /dev/null @@ -1,92 +0,0 @@ -package swarm - -import ( - "context" - "sync" - - peer "github.com/libp2p/go-libp2p-peer" -) - -type DialFunc func(context.Context, peer.ID) (*Conn, error) - -func NewDialSync(dfn DialFunc) *DialSync { - return &DialSync{ - dials: make(map[peer.ID]*activeDial), - dialFunc: dfn, - } -} - -type DialSync struct { - dials map[peer.ID]*activeDial - dialsLk sync.Mutex - dialFunc DialFunc -} - -type activeDial struct { - id peer.ID - refCnt int - refCntLk sync.Mutex - cancel func() - - err error - conn *Conn - waitch chan struct{} - - ds *DialSync -} - -func (dr *activeDial) wait(ctx context.Context) (*Conn, error) { - defer dr.decref() - select { - case <-dr.waitch: - return dr.conn, dr.err - case <-ctx.Done(): - return nil, ctx.Err() - } -} - -func (ad *activeDial) incref() { - ad.refCntLk.Lock() - defer ad.refCntLk.Unlock() - ad.refCnt++ -} - -func (ad *activeDial) decref() { - ad.refCntLk.Lock() - defer ad.refCntLk.Unlock() - ad.refCnt-- - if ad.refCnt <= 0 { - ad.cancel() - ad.ds.dialsLk.Lock() - delete(ad.ds.dials, ad.id) - ad.ds.dialsLk.Unlock() - } -} - -func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) { - ds.dialsLk.Lock() - - actd, ok := ds.dials[p] - if !ok { - ctx, cancel := context.WithCancel(context.Background()) - actd = &activeDial{ - id: p, - cancel: cancel, - waitch: make(chan struct{}), - ds: ds, - } - ds.dials[p] = actd - - go func(ctx context.Context, p peer.ID, ad *activeDial) { - ad.conn, ad.err = ds.dialFunc(ctx, p) - close(ad.waitch) - ad.cancel() - ad.waitch = nil // to ensure nobody tries reusing this - }(ctx, p, actd) - } - - actd.incref() - ds.dialsLk.Unlock() - - return actd.wait(ctx) -} diff --git a/p2p/net/swarm/dial_sync_test.go b/p2p/net/swarm/dial_sync_test.go deleted file mode 100644 index ca81a9c87209594b5a270ba85c19fc6ac10cb803..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/dial_sync_test.go +++ /dev/null @@ -1,203 +0,0 @@ -package swarm - -import ( - "context" - "fmt" - "sync" - "testing" - "time" - - peer "github.com/libp2p/go-libp2p-peer" -) - -func getMockDialFunc() (DialFunc, func(), context.Context, <-chan struct{}) { - dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care - dialctx, cancel := context.WithCancel(context.Background()) - ch := make(chan struct{}) - f := func(ctx context.Context, p peer.ID) (*Conn, error) { - dfcalls <- struct{}{} - defer cancel() - select { - case <-ch: - return new(Conn), nil - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - o := new(sync.Once) - - return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls -} - -func TestBasicDialSync(t *testing.T) { - df, done, _, callsch := getMockDialFunc() - - dsync := NewDialSync(df) - - p := peer.ID("testpeer") - - ctx := context.Background() - - finished := make(chan struct{}) - go func() { - _, err := dsync.DialLock(ctx, p) - if err != nil { - t.Error(err) - } - finished <- struct{}{} - }() - - go func() { - _, err := dsync.DialLock(ctx, p) - if err != nil { - t.Error(err) - } - finished <- struct{}{} - }() - - // short sleep just to make sure we've moved around in the scheduler - time.Sleep(time.Millisecond * 20) - done() - - <-finished - <-finished - - if len(callsch) > 1 { - t.Fatal("should only have called dial func once!") - } -} - -func TestDialSyncCancel(t *testing.T) { - df, done, _, dcall := getMockDialFunc() - - dsync := NewDialSync(df) - - p := peer.ID("testpeer") - - ctx1, cancel1 := context.WithCancel(context.Background()) - - finished := make(chan struct{}) - go func() { - _, err := dsync.DialLock(ctx1, p) - if err != ctx1.Err() { - t.Error("should have gotten context error") - } - finished <- struct{}{} - }() - - // make sure the above makes it through the wait code first - select { - case <-dcall: - case <-time.After(time.Second): - t.Fatal("timed out waiting for dial to start") - } - - // Add a second dialwait in so two actors are waiting on the same dial - go func() { - _, err := dsync.DialLock(context.Background(), p) - if err != nil { - t.Error(err) - } - finished <- struct{}{} - }() - - time.Sleep(time.Millisecond * 20) - - // cancel the first dialwait, it should not affect the second at all - cancel1() - select { - case <-finished: - case <-time.After(time.Second): - t.Fatal("timed out waiting for wait to exit") - } - - // short sleep just to make sure we've moved around in the scheduler - time.Sleep(time.Millisecond * 20) - done() - - <-finished -} - -func TestDialSyncAllCancel(t *testing.T) { - df, done, dctx, _ := getMockDialFunc() - - dsync := NewDialSync(df) - - p := peer.ID("testpeer") - - ctx1, cancel1 := context.WithCancel(context.Background()) - - finished := make(chan struct{}) - go func() { - _, err := dsync.DialLock(ctx1, p) - if err != ctx1.Err() { - t.Error("should have gotten context error") - } - finished <- struct{}{} - }() - - // Add a second dialwait in so two actors are waiting on the same dial - go func() { - _, err := dsync.DialLock(ctx1, p) - if err != ctx1.Err() { - t.Error("should have gotten context error") - } - finished <- struct{}{} - }() - - cancel1() - for i := 0; i < 2; i++ { - select { - case <-finished: - case <-time.After(time.Second): - t.Fatal("timed out waiting for wait to exit") - } - } - - // the dial should have exited now - select { - case <-dctx.Done(): - case <-time.After(time.Second): - t.Fatal("timed out waiting for dial to return") - } - - // should be able to successfully dial that peer again - done() - _, err := dsync.DialLock(context.Background(), p) - if err != nil { - t.Fatal(err) - } -} - -func TestFailFirst(t *testing.T) { - var count int - f := func(ctx context.Context, p peer.ID) (*Conn, error) { - if count > 0 { - return new(Conn), nil - } - count++ - return nil, fmt.Errorf("gophers ate the modem") - } - - ds := NewDialSync(f) - - p := peer.ID("testing") - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - _, err := ds.DialLock(ctx, p) - if err == nil { - t.Fatal("expected gophers to have eaten the modem") - } - - c, err := ds.DialLock(ctx, p) - if err != nil { - t.Fatal(err) - } - - if c == nil { - t.Fatal("should have gotten a 'real' conn back") - } -} diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go deleted file mode 100644 index dabb0bdb00e0cc623f96bda7f5724abaf51e2fcf..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/dial_test.go +++ /dev/null @@ -1,493 +0,0 @@ -package swarm - -import ( - "context" - "net" - "sync" - "testing" - "time" - - addrutil "github.com/libp2p/go-addr-util" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" - testutil "github.com/libp2p/go-testutil" - ci "github.com/libp2p/go-testutil/ci" - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" -) - -func closeSwarms(swarms []*Swarm) { - for _, s := range swarms { - s.Close() - } -} - -func TestBasicDial(t *testing.T) { - t.Parallel() - ctx := context.Background() - - swarms := makeSwarms(ctx, t, 2) - defer closeSwarms(swarms) - s1 := swarms[0] - s2 := swarms[1] - - s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), pstore.PermanentAddrTTL) - - c, err := s1.Dial(ctx, s2.local) - if err != nil { - t.Fatal(err) - } - - s, err := c.NewStream() - if err != nil { - t.Fatal(err) - } - - s.Close() -} - -func TestDialWithNoListeners(t *testing.T) { - t.Parallel() - ctx := context.Background() - - s1 := makeDialOnlySwarm(ctx, t) - - swarms := makeSwarms(ctx, t, 1) - defer closeSwarms(swarms) - s2 := swarms[0] - - s1.peers.AddAddrs(s2.local, s2.ListenAddresses(), pstore.PermanentAddrTTL) - - c, err := s1.Dial(ctx, s2.local) - if err != nil { - t.Fatal(err) - } - - s, err := c.NewStream() - if err != nil { - t.Fatal(err) - } - - s.Close() -} - -func acceptAndHang(l net.Listener) { - conns := make([]net.Conn, 0, 10) - for { - c, err := l.Accept() - if err != nil { - break - } - if c != nil { - conns = append(conns, c) - } - } - for _, c := range conns { - c.Close() - } -} - -func TestSimultDials(t *testing.T) { - // t.Skip("skipping for another test") - t.Parallel() - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - - // connect everyone - { - var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { - // copy for other peer - log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) - s.peers.AddAddr(dst, addr, pstore.TempAddrTTL) - if _, err := s.Dial(ctx, dst); err != nil { - t.Fatal("error swarm dialing to peer", err) - } - wg.Done() - } - - ifaceAddrs0, err := swarms[0].InterfaceListenAddresses() - if err != nil { - t.Fatal(err) - } - ifaceAddrs1, err := swarms[1].InterfaceListenAddresses() - if err != nil { - t.Fatal(err) - } - - log.Info("Connecting swarms simultaneously.") - for i := 0; i < 10; i++ { // connect 10x for each. - wg.Add(2) - go connect(swarms[0], swarms[1].local, ifaceAddrs1[0]) - go connect(swarms[1], swarms[0].local, ifaceAddrs0[0]) - } - wg.Wait() - } - - // should still just have 1, at most 2 connections :) - c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local)) - if c01l > 2 { - t.Error("0->1 has", c01l) - } - c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local)) - if c10l > 2 { - t.Error("1->0 has", c10l) - } - - for _, s := range swarms { - s.Close() - } -} - -func newSilentPeer(t *testing.T) (peer.ID, ma.Multiaddr, net.Listener) { - dst := testutil.RandPeerIDFatal(t) - lst, err := net.Listen("tcp", ":0") - if err != nil { - t.Fatal(err) - } - addr, err := manet.FromNetAddr(lst.Addr()) - if err != nil { - t.Fatal(err) - } - addrs := []ma.Multiaddr{addr} - addrs, err = addrutil.ResolveUnspecifiedAddresses(addrs, nil) - if err != nil { - t.Fatal(err) - } - t.Log("new silent peer:", dst, addrs[0]) - return dst, addrs[0], lst -} - -func TestDialWait(t *testing.T) { - // t.Skip("skipping for another test") - t.Parallel() - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 1) - s1 := swarms[0] - defer s1.Close() - - s1.dialT = time.Millisecond * 300 // lower timeout for tests. - if ci.IsRunning() { - s1.dialT = time.Second - } - - // dial to a non-existent peer. - s2p, s2addr, s2l := newSilentPeer(t) - go acceptAndHang(s2l) - defer s2l.Close() - s1.peers.AddAddr(s2p, s2addr, pstore.PermanentAddrTTL) - - before := time.Now() - if c, err := s1.Dial(ctx, s2p); err == nil { - defer c.Close() - t.Fatal("error swarm dialing to unknown peer worked...", err) - } else { - t.Log("correctly got error:", err) - } - duration := time.Since(before) - - dt := s1.dialT - if duration < dt*dialAttempts { - t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts) - } - if duration > 2*dt*dialAttempts { - t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts) - } - - if !s1.backf.Backoff(s2p) { - t.Error("s2 should now be on backoff") - } -} - -func TestDialBackoff(t *testing.T) { - // t.Skip("skipping for another test") - if ci.IsRunning() { - t.Skip("travis will never have fun with this test") - } - - t.Parallel() - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - s1 := swarms[0] - s2 := swarms[1] - defer s1.Close() - defer s2.Close() - - s1.dialT = time.Second // lower timeout for tests. - s2.dialT = time.Second // lower timeout for tests. - - s2addrs, err := s2.InterfaceListenAddresses() - if err != nil { - t.Fatal(err) - } - s1.peers.AddAddrs(s2.local, s2addrs, pstore.PermanentAddrTTL) - - // dial to a non-existent peer. - s3p, s3addr, s3l := newSilentPeer(t) - go acceptAndHang(s3l) - defer s3l.Close() - s1.peers.AddAddr(s3p, s3addr, pstore.PermanentAddrTTL) - - // in this test we will: - // 1) dial 10x to each node. - // 2) all dials should hang - // 3) s1->s2 should succeed. - // 4) s1->s3 should not (and should place s3 on backoff) - // 5) disconnect entirely - // 6) dial 10x to each node again - // 7) s3 dials should all return immediately (except 1) - // 8) s2 dials should all hang, and succeed - // 9) last s3 dial ends, unsuccessful - - dialOnlineNode := func(dst peer.ID, times int) <-chan bool { - ch := make(chan bool) - for i := 0; i < times; i++ { - go func() { - if _, err := s1.Dial(ctx, dst); err != nil { - t.Error("error dialing", dst, err) - ch <- false - } else { - ch <- true - } - }() - } - return ch - } - - dialOfflineNode := func(dst peer.ID, times int) <-chan bool { - ch := make(chan bool) - for i := 0; i < times; i++ { - go func() { - if c, err := s1.Dial(ctx, dst); err != nil { - ch <- false - } else { - t.Error("succeeded in dialing", dst) - ch <- true - c.Close() - } - }() - } - return ch - } - - { - // 1) dial 10x to each node. - N := 10 - s2done := dialOnlineNode(s2.local, N) - s3done := dialOfflineNode(s3p, N) - - // when all dials should be done by: - dialTimeout1x := time.After(s1.dialT) - // dialTimeout1Ax := time.After(s1.dialT * 2) // dialAttempts) - dialTimeout10Ax := time.After(s1.dialT * 2 * 10) // dialAttempts * 10) - - // 2) all dials should hang - select { - case <-s2done: - t.Error("s2 should not happen immediately") - case <-s3done: - t.Error("s3 should not happen yet") - case <-time.After(time.Millisecond): - // s2 may finish very quickly, so let's get out. - } - - // 3) s1->s2 should succeed. - for i := 0; i < N; i++ { - select { - case r := <-s2done: - if !r { - t.Error("s2 should not fail") - } - case <-s3done: - t.Error("s3 should not happen yet") - case <-dialTimeout1x: - t.Error("s2 took too long") - } - } - - select { - case <-s2done: - t.Error("s2 should have no more") - case <-s3done: - t.Error("s3 should not happen yet") - case <-dialTimeout1x: // let it pass - } - - // 4) s1->s3 should not (and should place s3 on backoff) - // N-1 should finish before dialTimeout1x * 2 - for i := 0; i < N; i++ { - select { - case <-s2done: - t.Error("s2 should have no more") - case r := <-s3done: - if r { - t.Error("s3 should not succeed") - } - case <-(dialTimeout1x): - if i < (N - 1) { - t.Fatal("s3 took too long") - } - t.Log("dialTimeout1x * 1.3 hit for last peer") - case <-dialTimeout10Ax: - t.Fatal("s3 took too long") - } - } - - // check backoff state - if s1.backf.Backoff(s2.local) { - t.Error("s2 should not be on backoff") - } - if !s1.backf.Backoff(s3p) { - t.Error("s3 should be on backoff") - } - - // 5) disconnect entirely - - for _, c := range s1.Connections() { - c.Close() - } - for i := 0; i < 100 && len(s1.Connections()) > 0; i++ { - <-time.After(time.Millisecond) - } - if len(s1.Connections()) > 0 { - t.Fatal("s1 conns must exit") - } - } - - { - // 6) dial 10x to each node again - N := 10 - s2done := dialOnlineNode(s2.local, N) - s3done := dialOfflineNode(s3p, N) - - // when all dials should be done by: - dialTimeout1x := time.After(s1.dialT) - // dialTimeout1Ax := time.After(s1.dialT * 2) // dialAttempts) - dialTimeout10Ax := time.After(s1.dialT * 2 * 10) // dialAttempts * 10) - - // 7) s3 dials should all return immediately (except 1) - for i := 0; i < N-1; i++ { - select { - case <-s2done: - t.Error("s2 should not succeed yet") - case r := <-s3done: - if r { - t.Error("s3 should not succeed") - } - case <-dialTimeout1x: - t.Fatal("s3 took too long") - } - } - - // 8) s2 dials should all hang, and succeed - for i := 0; i < N; i++ { - select { - case r := <-s2done: - if !r { - t.Error("s2 should succeed") - } - // case <-s3done: - case <-(dialTimeout1x): - t.Fatal("s3 took too long") - } - } - - // 9) the last s3 should return, failed. - select { - case <-s2done: - t.Error("s2 should have no more") - case r := <-s3done: - if r { - t.Error("s3 should not succeed") - } - case <-dialTimeout10Ax: - t.Fatal("s3 took too long") - } - - // check backoff state (the same) - if s1.backf.Backoff(s2.local) { - t.Error("s2 should not be on backoff") - } - if !s1.backf.Backoff(s3p) { - t.Error("s3 should be on backoff") - } - } -} - -func TestDialBackoffClears(t *testing.T) { - // t.Skip("skipping for another test") - t.Parallel() - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - s1 := swarms[0] - s2 := swarms[1] - defer s1.Close() - defer s2.Close() - s1.dialT = time.Millisecond * 300 // lower timeout for tests. - s2.dialT = time.Millisecond * 300 // lower timeout for tests. - if ci.IsRunning() { - s1.dialT = 2 * time.Second - s2.dialT = 2 * time.Second - } - - // use another address first, that accept and hang on conns - _, s2bad, s2l := newSilentPeer(t) - go acceptAndHang(s2l) - defer s2l.Close() - - // phase 1 -- dial to non-operational addresses - s1.peers.AddAddr(s2.local, s2bad, pstore.PermanentAddrTTL) - - before := time.Now() - if c, err := s1.Dial(ctx, s2.local); err == nil { - t.Fatal("dialing to broken addr worked...", err) - defer c.Close() - } else { - t.Log("correctly got error:", err) - } - duration := time.Since(before) - - dt := s1.dialT - if duration < dt*dialAttempts { - t.Error("< DialTimeout * dialAttempts not being respected", duration, dt*dialAttempts) - } - if duration > 2*dt*dialAttempts { - t.Error("> 2*DialTimeout * dialAttempts not being respected", duration, 2*dt*dialAttempts) - } - - if !s1.backf.Backoff(s2.local) { - t.Error("s2 should now be on backoff") - } else { - t.Log("correctly added to backoff") - } - - // phase 2 -- add the working address. dial should succeed. - ifaceAddrs1, err := swarms[1].InterfaceListenAddresses() - if err != nil { - t.Fatal(err) - } - s1.peers.AddAddrs(s2.local, ifaceAddrs1, pstore.PermanentAddrTTL) - - if _, err := s1.Dial(ctx, s2.local); err == nil { - t.Fatal("should have failed to dial backed off peer") - } - - time.Sleep(baseBackoffTime) - - if c, err := s1.Dial(ctx, s2.local); err != nil { - t.Fatal(err) - } else { - c.Close() - t.Log("correctly connected") - } - - if s1.backf.Backoff(s2.local) { - t.Error("s2 should no longer be on backoff") - } else { - t.Log("correctly cleared backoff") - } -} diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go deleted file mode 100644 index 5e96908e41e07099b86e09587a8d7f5f1b488c33..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/limiter.go +++ /dev/null @@ -1,147 +0,0 @@ -package swarm - -import ( - "context" - "sync" - - addrutil "github.com/libp2p/go-addr-util" - iconn "github.com/libp2p/go-libp2p-interface-conn" - peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" -) - -type dialResult struct { - Conn iconn.Conn - Err error -} - -type dialJob struct { - addr ma.Multiaddr - peer peer.ID - ctx context.Context - resp chan dialResult - success bool -} - -func (dj *dialJob) cancelled() bool { - select { - case <-dj.ctx.Done(): - return true - default: - return false - } -} - -type dialLimiter struct { - rllock sync.Mutex - fdConsuming int - fdLimit int - waitingOnFd []*dialJob - - dialFunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error) - - activePerPeer map[peer.ID]int - perPeerLimit int - waitingOnPeerLimit map[peer.ID][]*dialJob -} - -type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (iconn.Conn, error) - -func newDialLimiter(df dialfunc) *dialLimiter { - return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit) -} - -func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter { - return &dialLimiter{ - fdLimit: fdl, - perPeerLimit: ppl, - waitingOnPeerLimit: make(map[peer.ID][]*dialJob), - activePerPeer: make(map[peer.ID]int), - dialFunc: df, - } -} - -func (dl *dialLimiter) finishedDial(dj *dialJob) { - dl.rllock.Lock() - defer dl.rllock.Unlock() - - if addrutil.IsFDCostlyTransport(dj.addr) { - dl.fdConsuming-- - if len(dl.waitingOnFd) > 0 { - next := dl.waitingOnFd[0] - dl.waitingOnFd = dl.waitingOnFd[1:] - if len(dl.waitingOnFd) == 0 { - dl.waitingOnFd = nil // clear out memory - } - dl.fdConsuming++ - - go dl.executeDial(next) - } - } - - // release tokens in reverse order than we take them - dl.activePerPeer[dj.peer]-- - if dl.activePerPeer[dj.peer] == 0 { - delete(dl.activePerPeer, dj.peer) - } - - waitlist := dl.waitingOnPeerLimit[dj.peer] - if !dj.success && len(waitlist) > 0 { - next := waitlist[0] - if len(waitlist) == 1 { - delete(dl.waitingOnPeerLimit, dj.peer) - } else { - dl.waitingOnPeerLimit[dj.peer] = waitlist[1:] - } - dl.activePerPeer[dj.peer]++ // just kidding, we still want this token - - // can kick this off right here, dials in this list already - // have the other tokens needed - go dl.executeDial(next) - } - -} - -// AddDialJob tries to take the needed tokens for starting the given dial job. -// If it acquires all needed tokens, it immediately starts the dial, otherwise -// it will put it on the waitlist for the requested token. -func (dl *dialLimiter) AddDialJob(dj *dialJob) { - dl.rllock.Lock() - defer dl.rllock.Unlock() - - if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { - wlist := dl.waitingOnPeerLimit[dj.peer] - dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) - return - } - dl.activePerPeer[dj.peer]++ - - if addrutil.IsFDCostlyTransport(dj.addr) { - if dl.fdConsuming >= dl.fdLimit { - dl.waitingOnFd = append(dl.waitingOnFd, dj) - return - } - - // take token - dl.fdConsuming++ - } - - // take second needed token and start dial! - go dl.executeDial(dj) -} - -// executeDial calls the dialFunc, and reports the result through the response -// channel when finished. Once the response is sent it also releases all tokens -// it held during the dial. -func (dl *dialLimiter) executeDial(j *dialJob) { - defer dl.finishedDial(j) - if j.cancelled() { - return - } - - con, err := dl.dialFunc(j.ctx, j.peer, j.addr) - select { - case j.resp <- dialResult{Conn: con, Err: err}: - case <-j.ctx.Done(): - } -} diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go deleted file mode 100644 index 646019d265c11379af30099a3b7304f8fd18176c..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/limiter_test.go +++ /dev/null @@ -1,314 +0,0 @@ -package swarm - -import ( - "context" - "fmt" - "math/rand" - "strconv" - "testing" - "time" - - iconn "github.com/libp2p/go-libp2p-interface-conn" - peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" - mafmt "github.com/whyrusleeping/mafmt" -) - -func mustAddr(t *testing.T, s string) ma.Multiaddr { - a, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - return a -} - -func addrWithPort(t *testing.T, p int) ma.Multiaddr { - return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) -} - -// in these tests I use addresses with tcp ports over a certain number to -// signify 'good' addresses that will succeed, and addresses below that number -// will fail. This lets us more easily test these different scenarios. -func tcpPortOver(a ma.Multiaddr, n int) bool { - port, err := a.ValueForProtocol(ma.P_TCP) - if err != nil { - panic(err) - } - - pnum, err := strconv.Atoi(port) - if err != nil { - panic(err) - } - - return pnum > n -} - -func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) { - for _, a := range addrs { - l.AddDialJob(&dialJob{ - ctx: ctx, - peer: p, - addr: a, - resp: res, - }) - } -} - -func hangDialFunc(hang chan struct{}) dialfunc { - return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) { - if mafmt.UTP.Matches(a) { - return iconn.Conn(nil), nil - } - - if tcpPortOver(a, 10) { - return iconn.Conn(nil), nil - } - - <-hang - return nil, fmt.Errorf("test bad dial") - } -} - -func TestLimiterBasicDials(t *testing.T) { - hang := make(chan struct{}) - defer close(hang) - - l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4) - - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} - good := addrWithPort(t, 20) - - resch := make(chan dialResult) - pid := peer.ID("testpeer") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - tryDialAddrs(ctx, l, pid, bads, resch) - - l.AddDialJob(&dialJob{ - ctx: ctx, - peer: pid, - addr: good, - resp: resch, - }) - - select { - case <-resch: - t.Fatal("no dials should have completed!") - case <-time.After(time.Millisecond * 100): - } - - // complete a single hung dial - hang <- struct{}{} - - select { - case r := <-resch: - if r.Err == nil { - t.Fatal("should have gotten failed dial result") - } - case <-time.After(time.Second): - t.Fatal("timed out waiting for dial completion") - } - - select { - case r := <-resch: - if r.Err != nil { - t.Fatal("expected second result to be success!") - } - case <-time.After(time.Second): - } -} - -func TestFDLimiting(t *testing.T) { - hang := make(chan struct{}) - defer close(hang) - l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5) - - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} - pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"} - goodTCP := addrWithPort(t, 20) - - ctx := context.Background() - resch := make(chan dialResult) - - // take all fd limit tokens with hang dials - for _, pid := range pids { - tryDialAddrs(ctx, l, pid, bads, resch) - } - - // these dials should work normally, but will hang because we have taken - // up all the fd limiting - for _, pid := range pids { - l.AddDialJob(&dialJob{ - ctx: ctx, - peer: pid, - addr: goodTCP, - resp: resch, - }) - } - - select { - case <-resch: - t.Fatal("no dials should have completed!") - case <-time.After(time.Millisecond * 100): - } - - pid5 := peer.ID("testpeer5") - utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") - - // This should complete immediately since utp addresses arent blocked by fd rate limiting - l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) - - select { - case res := <-resch: - if res.Err != nil { - t.Fatal("should have gotten successful response") - } - case <-time.After(time.Second * 5): - t.Fatal("timeout waiting for utp addr success") - } -} - -func TestTokenRedistribution(t *testing.T) { - hangchs := make(map[peer.ID]chan struct{}) - df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) { - if tcpPortOver(a, 10) { - return (iconn.Conn)(nil), nil - } - - <-hangchs[p] - return nil, fmt.Errorf("test bad dial") - } - l := newDialLimiterWithParams(df, 8, 4) - - bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} - pids := []peer.ID{"testpeer1", "testpeer2"} - - ctx := context.Background() - resch := make(chan dialResult) - - // take all fd limit tokens with hang dials - for _, pid := range pids { - hangchs[pid] = make(chan struct{}) - tryDialAddrs(ctx, l, pid, bads, resch) - } - - good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001") - - // add a good dial job for peer 1 - l.AddDialJob(&dialJob{ - ctx: ctx, - peer: pids[1], - addr: good, - resp: resch, - }) - - select { - case <-resch: - t.Fatal("no dials should have completed!") - case <-time.After(time.Millisecond * 100): - } - - // unblock one dial for peer 0 - hangchs[pids[0]] <- struct{}{} - - select { - case res := <-resch: - if res.Err == nil { - t.Fatal("should have only been a failure here") - } - case <-time.After(time.Millisecond * 100): - t.Fatal("expected a dial failure here") - } - - select { - case <-resch: - t.Fatal("no more dials should have completed!") - case <-time.After(time.Millisecond * 100): - } - - // add a bad dial job to peer 0 to fill their rate limiter - // and test that more dials for this peer won't interfere with peer 1's successful dial incoming - l.AddDialJob(&dialJob{ - ctx: ctx, - peer: pids[0], - addr: addrWithPort(t, 7), - resp: resch, - }) - - hangchs[pids[1]] <- struct{}{} - - // now one failed dial from peer 1 should get through and fail - // which will in turn unblock the successful dial on peer 1 - select { - case res := <-resch: - if res.Err == nil { - t.Fatal("should have only been a failure here") - } - case <-time.After(time.Millisecond * 100): - t.Fatal("expected a dial failure here") - } - - select { - case res := <-resch: - if res.Err != nil { - t.Fatal("should have succeeded!") - } - case <-time.After(time.Millisecond * 100): - t.Fatal("should have gotten successful dial") - } -} - -func TestStressLimiter(t *testing.T) { - df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) { - if tcpPortOver(a, 1000) { - return iconn.Conn(nil), nil - } - - time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100))) - return nil, fmt.Errorf("test bad dial") - } - - l := newDialLimiterWithParams(df, 20, 5) - - var bads []ma.Multiaddr - for i := 0; i < 100; i++ { - bads = append(bads, addrWithPort(t, i)) - } - - addresses := append(bads, addrWithPort(t, 2000)) - success := make(chan struct{}) - - for i := 0; i < 20; i++ { - go func(id peer.ID) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - resp := make(chan dialResult) - time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) - for _, i := range rand.Perm(len(addresses)) { - l.AddDialJob(&dialJob{ - addr: addresses[i], - ctx: ctx, - peer: id, - resp: resp, - }) - } - - for res := range resp { - if res.Err == nil { - success <- struct{}{} - return - } - } - }(peer.ID(fmt.Sprintf("testpeer%d", i))) - } - - for i := 0; i < 20; i++ { - select { - case <-success: - case <-time.After(time.Second * 5): - t.Fatal("expected a success within five seconds") - } - } -} diff --git a/p2p/net/swarm/peers_test.go b/p2p/net/swarm/peers_test.go deleted file mode 100644 index 3259264ae82fce2584bf41a135f1b360be626376..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/peers_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package swarm - -import ( - "testing" - - "context" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" -) - -func TestPeers(t *testing.T) { - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - s1 := swarms[0] - s2 := swarms[1] - - connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { - // TODO: make a DialAddr func. - s.peers.AddAddr(dst, addr, pstore.PermanentAddrTTL) - // t.Logf("connections from %s", s.LocalPeer()) - // for _, c := range s.ConnectionsToPeer(dst) { - // t.Logf("connection from %s to %s: %v", s.LocalPeer(), dst, c) - // } - // t.Logf("") - if _, err := s.Dial(ctx, dst); err != nil { - t.Fatal("error swarm dialing to peer", err) - } - // t.Log(s.swarm.Dump()) - } - - s1GotConn := make(chan struct{}, 0) - s2GotConn := make(chan struct{}, 0) - s1.SetConnHandler(func(c *Conn) { - s1GotConn <- struct{}{} - }) - s2.SetConnHandler(func(c *Conn) { - s2GotConn <- struct{}{} - }) - - connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0]) - <-s2GotConn // have to wait here so the other side catches up. - connect(s2, s1.LocalPeer(), s1.ListenAddresses()[0]) - - for i := 0; i < 100; i++ { - connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0]) - connect(s2, s1.LocalPeer(), s1.ListenAddresses()[0]) - } - - for _, s := range swarms { - log.Infof("%s swarm routing table: %s", s.local, s.Peers()) - } - - test := func(s *Swarm) { - expect := 1 - actual := len(s.Peers()) - if actual != expect { - t.Errorf("%s has %d peers, not %d: %v", s.LocalPeer(), actual, expect, s.Peers()) - t.Log(s.swarm.Dump()) - } - actual = len(s.Connections()) - if actual != expect { - t.Errorf("%s has %d conns, not %d: %v", s.LocalPeer(), actual, expect, s.Connections()) - t.Log(s.swarm.Dump()) - } - } - - test(s1) - test(s2) -} diff --git a/p2p/net/swarm/simul_test.go b/p2p/net/swarm/simul_test.go deleted file mode 100644 index 8935773296a26518af8230f8c33386b4366d3bf9..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/simul_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package swarm - -import ( - "context" - "runtime" - "sync" - "testing" - "time" - - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" - ci "github.com/libp2p/go-testutil/ci" - ma "github.com/multiformats/go-multiaddr" -) - -func TestSimultOpen(t *testing.T) { - - t.Parallel() - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - - // connect everyone - { - var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { - // copy for other peer - log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr) - s.peers.AddAddr(dst, addr, pstore.PermanentAddrTTL) - if _, err := s.Dial(ctx, dst); err != nil { - t.Fatal("error swarm dialing to peer", err) - } - wg.Done() - } - - log.Info("Connecting swarms simultaneously.") - wg.Add(2) - go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0]) - go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0]) - wg.Wait() - } - - for _, s := range swarms { - s.Close() - } -} - -func TestSimultOpenMany(t *testing.T) { - // t.Skip("very very slow") - - addrs := 20 - rounds := 10 - if ci.IsRunning() || runtime.GOOS == "darwin" { - // osx has a limit of 256 file descriptors - addrs = 10 - rounds = 5 - } - SubtestSwarm(t, addrs, rounds) -} - -func TestSimultOpenFewStress(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - // t.Skip("skipping for another test") - t.Parallel() - - msgs := 40 - swarms := 2 - rounds := 10 - // rounds := 100 - - for i := 0; i < rounds; i++ { - SubtestSwarm(t, swarms, msgs) - <-time.After(10 * time.Millisecond) - } -} diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go deleted file mode 100644 index bb179bbc69efda9fdd6621a702d5db73c9913b65..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm.go +++ /dev/null @@ -1,395 +0,0 @@ -// Package swarm implements a connection muxer with a pair of channels -// to synchronize all network communication. -package swarm - -import ( - "context" - "fmt" - "io/ioutil" - "os" - "strings" - "sync" - "time" - - logging "github.com/ipfs/go-log" - ps "github.com/jbenet/go-peerstream" - pst "github.com/jbenet/go-stream-muxer" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" - addrutil "github.com/libp2p/go-addr-util" - conn "github.com/libp2p/go-libp2p-conn" - ci "github.com/libp2p/go-libp2p-crypto" - metrics "github.com/libp2p/go-libp2p-metrics" - mconn "github.com/libp2p/go-libp2p-metrics/conn" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" - transport "github.com/libp2p/go-libp2p-transport" - filter "github.com/libp2p/go-maddr-filter" - tcpt "github.com/libp2p/go-tcp-transport" - ma "github.com/multiformats/go-multiaddr" - psmss "github.com/whyrusleeping/go-smux-multistream" - spdy "github.com/whyrusleeping/go-smux-spdystream" - yamux "github.com/whyrusleeping/go-smux-yamux" - mafilter "github.com/whyrusleeping/multiaddr-filter" - ws "github.com/whyrusleeping/ws-transport" -) - -var log = logging.Logger("swarm2") - -// PSTransport is the default peerstream transport that will be used by -// any libp2p swarms. -var PSTransport pst.Transport - -func init() { - msstpt := psmss.NewBlankTransport() - - ymxtpt := &yamux.Transport{ - AcceptBacklog: 8192, - ConnectionWriteTimeout: time.Second * 10, - KeepAliveInterval: time.Second * 30, - EnableKeepAlive: true, - MaxStreamWindowSize: uint32(1024 * 512), - LogOutput: ioutil.Discard, - } - - msstpt.AddTransport("/yamux/1.0.0", ymxtpt) - msstpt.AddTransport("/spdy/3.1.0", spdy.Transport) - - // allow overriding of muxer preferences - if prefs := os.Getenv("LIBP2P_MUX_PREFS"); prefs != "" { - msstpt.OrderPreference = strings.Fields(prefs) - } - - PSTransport = msstpt -} - -// Swarm is a connection muxer, allowing connections to other peers to -// be opened and closed, while still using the same Chan for all -// communication. The Chan sends/receives Messages, which note the -// destination or source Peer. -// -// Uses peerstream.Swarm -type Swarm struct { - swarm *ps.Swarm - local peer.ID - peers pstore.Peerstore - connh ConnHandler - - dsync *DialSync - backf dialbackoff - dialT time.Duration // mainly for tests - - dialer *conn.Dialer - - notifmu sync.RWMutex - notifs map[inet.Notifiee]ps.Notifiee - - transports []transport.Transport - - // filters for addresses that shouldnt be dialed - Filters *filter.Filters - - // file descriptor rate limited - fdRateLimit chan struct{} - - proc goprocess.Process - ctx context.Context - bwc metrics.Reporter - - limiter *dialLimiter -} - -// NewSwarm constructs a Swarm, with a Chan. -func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, - local peer.ID, peers pstore.Peerstore, bwc metrics.Reporter) (*Swarm, error) { - - listenAddrs, err := filterAddrs(listenAddrs) - if err != nil { - return nil, err - } - - var wrap func(c transport.Conn) transport.Conn - if bwc != nil { - wrap = func(c transport.Conn) transport.Conn { - return mconn.WrapConn(bwc, c) - } - } - - s := &Swarm{ - swarm: ps.NewSwarm(PSTransport), - local: local, - peers: peers, - ctx: ctx, - dialT: DialTimeout, - notifs: make(map[inet.Notifiee]ps.Notifiee), - transports: []transport.Transport{ - tcpt.NewTCPTransport(), - new(ws.WebsocketTransport), - }, - bwc: bwc, - fdRateLimit: make(chan struct{}, concurrentFdDials), - Filters: filter.NewFilters(), - dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), - } - - s.dsync = NewDialSync(s.doDial) - s.limiter = newDialLimiter(s.dialAddr) - - // configure Swarm - s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) - s.SetConnHandler(nil) // make sure to setup our own conn handler. - - err = s.setupInterfaces(listenAddrs) - if err != nil { - return nil, err - } - - return s, nil -} - -func NewBlankSwarm(ctx context.Context, id peer.ID, privkey ci.PrivKey, pstpt pst.Transport) *Swarm { - s := &Swarm{ - swarm: ps.NewSwarm(pstpt), - local: id, - peers: pstore.NewPeerstore(), - ctx: ctx, - dialT: DialTimeout, - notifs: make(map[inet.Notifiee]ps.Notifiee), - fdRateLimit: make(chan struct{}, concurrentFdDials), - Filters: filter.NewFilters(), - dialer: conn.NewDialer(id, privkey, nil), - } - - // configure Swarm - s.limiter = newDialLimiter(s.dialAddr) - s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) - s.SetConnHandler(nil) // make sure to setup our own conn handler. - - return s -} - -func (s *Swarm) AddTransport(t transport.Transport) { - s.transports = append(s.transports, t) -} - -func (s *Swarm) teardown() error { - return s.swarm.Close() -} - -// AddAddrFilter adds a multiaddr filter to the set of filters the swarm will -// use to determine which addresses not to dial to. -func (s *Swarm) AddAddrFilter(f string) error { - m, err := mafilter.NewMask(f) - if err != nil { - return err - } - - s.Filters.AddDialFilter(m) - return nil -} - -func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { - if len(listenAddrs) > 0 { - filtered := addrutil.FilterUsableAddrs(listenAddrs) - if len(filtered) < 1 { - return nil, fmt.Errorf("swarm cannot use any addr in: %s", listenAddrs) - } - listenAddrs = filtered - } - - return listenAddrs, nil -} - -// Listen sets up listeners for all of the given addresses -func (s *Swarm) Listen(addrs ...ma.Multiaddr) error { - addrs, err := filterAddrs(addrs) - if err != nil { - return err - } - - return s.setupInterfaces(addrs) -} - -// Process returns the Process of the swarm -func (s *Swarm) Process() goprocess.Process { - return s.proc -} - -// Context returns the context of the swarm -func (s *Swarm) Context() context.Context { - return s.ctx -} - -// Close stops the Swarm. -func (s *Swarm) Close() error { - return s.proc.Close() -} - -// StreamSwarm returns the underlying peerstream.Swarm -func (s *Swarm) StreamSwarm() *ps.Swarm { - return s.swarm -} - -// SetConnHandler assigns the handler for new connections. -// See peerstream. You will rarely use this. See SetStreamHandler -func (s *Swarm) SetConnHandler(handler ConnHandler) { - - // handler is nil if user wants to clear the old handler. - if handler == nil { - s.swarm.SetConnHandler(func(psconn *ps.Conn) { - s.connHandler(psconn) - }) - return - } - - s.swarm.SetConnHandler(func(psconn *ps.Conn) { - // sc is nil if closed in our handler. - if sc := s.connHandler(psconn); sc != nil { - // call the user's handler. in a goroutine for sync safety. - go handler(sc) - } - }) -} - -// SetStreamHandler assigns the handler for new streams. -// See peerstream. -func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) { - s.swarm.SetStreamHandler(func(s *ps.Stream) { - handler(wrapStream(s)) - }) -} - -// NewStreamWithPeer creates a new stream on any available connection to p -func (s *Swarm) NewStreamWithPeer(ctx context.Context, p peer.ID) (*Stream, error) { - // if we have no connections, try connecting. - if len(s.ConnectionsToPeer(p)) == 0 { - log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...") - if _, err := s.Dial(ctx, p); err != nil { - return nil, err - } - } - log.Debug("Swarm: NewStreamWithPeer...") - - // TODO: think about passing a context down to NewStreamWithGroup - st, err := s.swarm.NewStreamWithGroup(p) - return wrapStream(st), err -} - -// StreamsWithPeer returns all the live Streams to p -func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream { - return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams())) -} - -// ConnectionsToPeer returns all the live connections to p -func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn { - return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns())) -} - -func (s *Swarm) HaveConnsToPeer(p peer.ID) bool { - for _, c := range s.swarm.Conns() { - if c.InGroup(p) { - return true - } - } - return false -} - -// Connections returns a slice of all connections. -func (s *Swarm) Connections() []*Conn { - return wrapConns(s.swarm.Conns()) -} - -// CloseConnection removes a given peer from swarm + closes the connection -func (s *Swarm) CloseConnection(p peer.ID) error { - conns := s.swarm.ConnsWithGroup(p) // boom. - for _, c := range conns { - c.Close() - } - return nil -} - -// Peers returns a copy of the set of peers swarm is connected to. -func (s *Swarm) Peers() []peer.ID { - conns := s.Connections() - - seen := make(map[peer.ID]struct{}) - peers := make([]peer.ID, 0, len(conns)) - for _, c := range conns { - p := c.RemotePeer() - if _, found := seen[p]; found { - continue - } - - seen[p] = struct{}{} - peers = append(peers, p) - } - return peers -} - -// LocalPeer returns the local peer swarm is associated to. -func (s *Swarm) LocalPeer() peer.ID { - return s.local -} - -// Backoff returns the dialbackoff object for this swarm. -func (s *Swarm) Backoff() *dialbackoff { - return &s.backf -} - -// notifyAll sends a signal to all Notifiees -func (s *Swarm) notifyAll(notify func(inet.Notifiee)) { - s.notifmu.RLock() - for f := range s.notifs { - go notify(f) - } - s.notifmu.RUnlock() -} - -// Notify signs up Notifiee to receive signals when events happen -func (s *Swarm) Notify(f inet.Notifiee) { - // wrap with our notifiee, to translate function calls - n := &ps2netNotifee{net: (*Network)(s), not: f} - - s.notifmu.Lock() - s.notifs[f] = n - s.notifmu.Unlock() - - // register for notifications in the peer swarm. - s.swarm.Notify(n) -} - -// StopNotify unregisters Notifiee fromr receiving signals -func (s *Swarm) StopNotify(f inet.Notifiee) { - s.notifmu.Lock() - n, found := s.notifs[f] - if found { - delete(s.notifs, f) - } - s.notifmu.Unlock() - - if found { - s.swarm.StopNotify(n) - } -} - -type ps2netNotifee struct { - net *Network - not inet.Notifiee -} - -func (n *ps2netNotifee) Connected(c *ps.Conn) { - n.not.Connected(n.net, inet.Conn((*Conn)(c))) -} - -func (n *ps2netNotifee) Disconnected(c *ps.Conn) { - n.not.Disconnected(n.net, inet.Conn((*Conn)(c))) -} - -func (n *ps2netNotifee) OpenedStream(s *ps.Stream) { - n.not.OpenedStream(n.net, &Stream{stream: s}) -} - -func (n *ps2netNotifee) ClosedStream(s *ps.Stream) { - n.not.ClosedStream(n.net, &Stream{stream: s}) -} diff --git a/p2p/net/swarm/swarm_addr.go b/p2p/net/swarm/swarm_addr.go deleted file mode 100644 index 78a3f5351a10697ea241f0f498bf05d3d1d17aa5..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_addr.go +++ /dev/null @@ -1,26 +0,0 @@ -package swarm - -import ( - addrutil "github.com/libp2p/go-addr-util" - iconn "github.com/libp2p/go-libp2p-interface-conn" - ma "github.com/multiformats/go-multiaddr" -) - -// ListenAddresses returns a list of addresses at which this swarm listens. -func (s *Swarm) ListenAddresses() []ma.Multiaddr { - listeners := s.swarm.Listeners() - addrs := make([]ma.Multiaddr, 0, len(listeners)) - for _, l := range listeners { - if l2, ok := l.NetListener().(iconn.Listener); ok { - addrs = append(addrs, l2.Multiaddr()) - } - } - return addrs -} - -// InterfaceListenAddresses returns a list of addresses at which this swarm -// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to -// use the known local interfaces. -func (s *Swarm) InterfaceListenAddresses() ([]ma.Multiaddr, error) { - return addrutil.ResolveUnspecifiedAddresses(s.ListenAddresses(), nil) -} diff --git a/p2p/net/swarm/swarm_addr_test.go b/p2p/net/swarm/swarm_addr_test.go deleted file mode 100644 index 011b706d813c804922d7cd0775dba8195e7caa0b..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_addr_test.go +++ /dev/null @@ -1,122 +0,0 @@ -package swarm - -import ( - "context" - "testing" - - addrutil "github.com/libp2p/go-addr-util" - metrics "github.com/libp2p/go-libp2p-metrics" - pstore "github.com/libp2p/go-libp2p-peerstore" - testutil "github.com/libp2p/go-testutil" - ma "github.com/multiformats/go-multiaddr" -) - -func TestFilterAddrs(t *testing.T) { - - m := func(s string) ma.Multiaddr { - maddr, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - return maddr - } - - bad := []ma.Multiaddr{ - m("/ip4/1.2.3.4/udp/1234"), // unreliable - m("/ip4/1.2.3.4/udp/1234/sctp/1234"), // not in manet - m("/ip4/1.2.3.4/udp/1234/udt"), // udt is broken on arm - m("/ip6/fe80::1/tcp/0"), // link local - m("/ip6/fe80::100/tcp/1234"), // link local - } - - good := []ma.Multiaddr{ - m("/ip4/127.0.0.1/tcp/0"), - m("/ip6/::1/tcp/0"), - m("/ip4/1.2.3.4/udp/1234/utp"), - } - - goodAndBad := append(good, bad...) - - // test filters - - for _, a := range bad { - if addrutil.AddrUsable(a, false) { - t.Errorf("addr %s should be unusable", a) - } - } - - for _, a := range good { - if !addrutil.AddrUsable(a, false) { - t.Errorf("addr %s should be usable", a) - } - } - - subtestAddrsEqual(t, addrutil.FilterUsableAddrs(bad), []ma.Multiaddr{}) - subtestAddrsEqual(t, addrutil.FilterUsableAddrs(good), good) - subtestAddrsEqual(t, addrutil.FilterUsableAddrs(goodAndBad), good) - - // now test it with swarm - - id, err := testutil.RandPeerID() - if err != nil { - t.Fatal(err) - } - - ps := pstore.NewPeerstore() - ctx := context.Background() - - if _, err := NewNetwork(ctx, bad, id, ps, metrics.NewBandwidthCounter()); err == nil { - t.Fatal("should have failed to create swarm") - } - - if _, err := NewNetwork(ctx, goodAndBad, id, ps, metrics.NewBandwidthCounter()); err != nil { - t.Fatal("should have succeeded in creating swarm", err) - } -} - -func subtestAddrsEqual(t *testing.T, a, b []ma.Multiaddr) { - if len(a) != len(b) { - t.Error(t) - } - - in := func(addr ma.Multiaddr, l []ma.Multiaddr) bool { - for _, addr2 := range l { - if addr.Equal(addr2) { - return true - } - } - return false - } - - for _, aa := range a { - if !in(aa, b) { - t.Errorf("%s not in %s", aa, b) - } - } -} - -func TestDialBadAddrs(t *testing.T) { - - m := func(s string) ma.Multiaddr { - maddr, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - return maddr - } - - ctx := context.Background() - s := makeSwarms(ctx, t, 1)[0] - - test := func(a ma.Multiaddr) { - p := testutil.RandPeerIDFatal(t) - s.peers.AddAddr(p, a, pstore.PermanentAddrTTL) - if _, err := s.Dial(ctx, p); err == nil { - t.Errorf("swarm should not dial: %s", p) - } - } - - test(m("/ip6/fe80::1")) // link local - test(m("/ip6/fe80::100")) // link local - test(m("/ip4/127.0.0.1/udp/1234/utp")) // utp -} diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go deleted file mode 100644 index 72609f0f329cc1e1759d6deb1e5b7226df972920..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_conn.go +++ /dev/null @@ -1,141 +0,0 @@ -package swarm - -import ( - "context" - "fmt" - - ps "github.com/jbenet/go-peerstream" - ic "github.com/libp2p/go-libp2p-crypto" - iconn "github.com/libp2p/go-libp2p-interface-conn" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" -) - -// Conn is a simple wrapper around a ps.Conn that also exposes -// some of the methods from the underlying conn.Conn. -// There's **five** "layers" to each connection: -// * 0. the net.Conn - underlying net.Conn (TCP/UDP/UTP/etc) -// * 1. the manet.Conn - provides multiaddr friendly Conn -// * 2. the conn.Conn - provides Peer friendly Conn (inc Secure channel) -// * 3. the peerstream.Conn - provides peerstream / spdysptream happiness -// * 4. the Conn - abstracts everyting out, exposing only key parts of underlying layers -// (I know, this is kinda crazy. it's more historical than a good design. though the -// layers do build up pieces of functionality. and they're all just io.RW :) ) -type Conn ps.Conn - -// ConnHandler is called when new conns are opened from remote peers. -// See peerstream.ConnHandler -type ConnHandler func(*Conn) - -func (c *Conn) StreamConn() *ps.Conn { - return (*ps.Conn)(c) -} - -func (c *Conn) RawConn() iconn.Conn { - // righly panic if these things aren't true. it is an expected - // invariant that these Conns are all of the typewe expect: - // ps.Conn wrapping a conn.Conn - // if we get something else it is programmer error. - return (*ps.Conn)(c).NetConn().(iconn.Conn) -} - -func (c *Conn) String() string { - return fmt.Sprintf("", c.RawConn()) -} - -// LocalMultiaddr is the Multiaddr on this side -func (c *Conn) LocalMultiaddr() ma.Multiaddr { - return c.RawConn().LocalMultiaddr() -} - -// LocalPeer is the Peer on our side of the connection -func (c *Conn) LocalPeer() peer.ID { - return c.RawConn().LocalPeer() -} - -// RemoteMultiaddr is the Multiaddr on the remote side -func (c *Conn) RemoteMultiaddr() ma.Multiaddr { - return c.RawConn().RemoteMultiaddr() -} - -// RemotePeer is the Peer on the remote side -func (c *Conn) RemotePeer() peer.ID { - return c.RawConn().RemotePeer() -} - -// LocalPrivateKey is the public key of the peer on this side -func (c *Conn) LocalPrivateKey() ic.PrivKey { - return c.RawConn().LocalPrivateKey() -} - -// RemotePublicKey is the public key of the peer on the remote side -func (c *Conn) RemotePublicKey() ic.PubKey { - return c.RawConn().RemotePublicKey() -} - -// NewSwarmStream returns a new Stream from this connection -func (c *Conn) NewSwarmStream() (*Stream, error) { - s, err := c.StreamConn().NewStream() - return wrapStream(s), err -} - -// NewStream returns a new Stream from this connection -func (c *Conn) NewStream() (inet.Stream, error) { - s, err := c.NewSwarmStream() - return inet.Stream(s), err -} - -// Close closes the underlying stream connection -func (c *Conn) Close() error { - return c.StreamConn().Close() -} - -func wrapConn(psc *ps.Conn) (*Conn, error) { - // grab the underlying connection. - if _, ok := psc.NetConn().(iconn.Conn); !ok { - // this should never happen. if we see it ocurring it means that we added - // a Listener to the ps.Swarm that is NOT one of our net/conn.Listener. - return nil, fmt.Errorf("swarm connHandler: invalid conn (not a conn.Conn): %s", psc) - } - return (*Conn)(psc), nil -} - -// wrapConns returns a *Conn for all these ps.Conns -func wrapConns(conns1 []*ps.Conn) []*Conn { - conns2 := make([]*Conn, len(conns1)) - for i, c1 := range conns1 { - if c2, err := wrapConn(c1); err == nil { - conns2[i] = c2 - } - } - return conns2 -} - -// newConnSetup does the swarm's "setup" for a connection. returns the underlying -// conn.Conn this method is used by both swarm.Dial and ps.Swarm connHandler -func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error) { - - // wrap with a Conn - sc, err := wrapConn(psConn) - if err != nil { - return nil, err - } - - // if we have a public key, make sure we add it to our peerstore! - // This is an important detail. Otherwise we must fetch the public - // key from the DHT or some other system. - if pk := sc.RemotePublicKey(); pk != nil { - s.peers.AddPubKey(sc.RemotePeer(), pk) - } - - // ok great! we can use it. add it to our group. - - // set the RemotePeer as a group on the conn. this lets us group - // connections in the StreamSwarm by peer, and get a streams from - // any available connection in the group (better multiconn): - // swarm.StreamSwarm().NewStreamWithGroup(remotePeer) - psConn.AddGroup(sc.RemotePeer()) - - return sc, nil -} diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go deleted file mode 100644 index 0f153016983a83403bd4fccf908373bb6e61ab0c..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_dial.go +++ /dev/null @@ -1,409 +0,0 @@ -package swarm - -import ( - "context" - "errors" - "fmt" - "sync" - "time" - - addrutil "github.com/libp2p/go-addr-util" - iconn "github.com/libp2p/go-libp2p-interface-conn" - lgbl "github.com/libp2p/go-libp2p-loggables" - peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" -) - -// Diagram of dial sync: -// -// many callers of Dial() synched w. dials many addrs results to callers -// ----------------------\ dialsync use earliest /-------------- -// -----------------------\ |----------\ /---------------- -// ------------------------>------------<------- >---------<----------------- -// -----------------------| \----x \---------------- -// ----------------------| \-----x \--------------- -// any may fail if no addr at end -// retry dialAttempt x - -var ( - // ErrDialBackoff is returned by the backoff code when a given peer has - // been dialed too frequently - ErrDialBackoff = errors.New("dial backoff") - - // ErrDialFailed is returned when connecting to a peer has ultimately failed - ErrDialFailed = errors.New("dial attempt failed") - - // ErrDialToSelf is returned if we attempt to dial our own peer - ErrDialToSelf = errors.New("dial to self attempted") -) - -// dialAttempts governs how many times a goroutine will try to dial a given peer. -// Note: this is down to one, as we have _too many dials_ atm. To add back in, -// add loop back in Dial(.) -const dialAttempts = 1 - -// number of concurrent outbound dials over transports that consume file descriptors -const concurrentFdDials = 160 - -// number of concurrent outbound dials to make per peer -const defaultPerPeerRateLimit = 8 - -// DialTimeout is the amount of time each dial attempt has. We can think about making -// this larger down the road, or putting more granular timeouts (i.e. within each -// subcomponent of Dial) -var DialTimeout = time.Second * 10 - -// dialbackoff is a struct used to avoid over-dialing the same, dead peers. -// Whenever we totally time out on a peer (all three attempts), we add them -// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they -// check dialbackoff. If it's there, they don't wait and exit promptly with -// an error. (the single goroutine that is actually dialing continues to -// dial). If a dial is successful, the peer is removed from backoff. -// Example: -// -// for { -// if ok, wait := dialsync.Lock(p); !ok { -// if backoff.Backoff(p) { -// return errDialFailed -// } -// <-wait -// continue -// } -// defer dialsync.Unlock(p) -// c, err := actuallyDial(p) -// if err != nil { -// dialbackoff.AddBackoff(p) -// continue -// } -// dialbackoff.Clear(p) -// } -// - -type dialbackoff struct { - entries map[peer.ID]*backoffPeer - lock sync.RWMutex -} - -type backoffPeer struct { - tries int - until time.Time -} - -func (db *dialbackoff) init() { - if db.entries == nil { - db.entries = make(map[peer.ID]*backoffPeer) - } -} - -// Backoff returns whether the client should backoff from dialing -// peer p -func (db *dialbackoff) Backoff(p peer.ID) (backoff bool) { - db.lock.Lock() - defer db.lock.Unlock() - db.init() - bp, found := db.entries[p] - if found && time.Now().Before(bp.until) { - return true - } - - return false -} - -const baseBackoffTime = time.Second * 5 -const maxBackoffTime = time.Minute * 5 - -// AddBackoff lets other nodes know that we've entered backoff with -// peer p, so dialers should not wait unnecessarily. We still will -// attempt to dial with one goroutine, in case we get through. -func (db *dialbackoff) AddBackoff(p peer.ID) { - db.lock.Lock() - defer db.lock.Unlock() - db.init() - bp, ok := db.entries[p] - if !ok { - db.entries[p] = &backoffPeer{ - tries: 1, - until: time.Now().Add(baseBackoffTime), - } - return - } - - expTimeAdd := time.Second * time.Duration(bp.tries*bp.tries) - if expTimeAdd > maxBackoffTime { - expTimeAdd = maxBackoffTime - } - bp.until = time.Now().Add(baseBackoffTime + expTimeAdd) - bp.tries++ -} - -// Clear removes a backoff record. Clients should call this after a -// successful Dial. -func (db *dialbackoff) Clear(p peer.ID) { - db.lock.Lock() - defer db.lock.Unlock() - db.init() - delete(db.entries, p) -} - -// Dial connects to a peer. -// -// The idea is that the client of Swarm does not need to know what network -// the connection will happen over. Swarm can use whichever it choses. -// This allows us to use various transport protocols, do NAT traversal/relay, -// etc. to achive connection. -func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) { - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - if p == s.local { - log.Event(ctx, "swarmDialSelf", logdial) - return nil, ErrDialToSelf - } - - return s.gatedDialAttempt(ctx, p) -} - -func (s *Swarm) bestConnectionToPeer(p peer.ID) *Conn { - cs := s.ConnectionsToPeer(p) - for _, conn := range cs { - if conn != nil { // dump out the first one we find. (TODO pick better) - return conn - } - } - return nil -} - -// gatedDialAttempt is an attempt to dial a node. It is gated by the swarm's -// dial synchronization systems: dialsync and dialbackoff. -func (s *Swarm) gatedDialAttempt(ctx context.Context, p peer.ID) (*Conn, error) { - defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done() - - // check if we already have an open connection first - conn := s.bestConnectionToPeer(p) - if conn != nil { - return conn, nil - } - - // if this peer has been backed off, lets get out of here - if s.backf.Backoff(p) { - log.Event(ctx, "swarmDialBackoff", p) - return nil, ErrDialBackoff - } - - return s.dsync.DialLock(ctx, p) -} - -// doDial is an ugly shim method to retain all the logging and backoff logic -// of the old dialsync code -func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - // ok, we have been charged to dial! let's do it. - // if it succeeds, dial will add the conn to the swarm itself. - defer log.EventBegin(ctx, "swarmDialAttemptStart", logdial).Done() - ctxT, cancel := context.WithTimeout(ctx, s.dialT) - conn, err := s.dial(ctxT, p) - cancel() - log.Debugf("dial end %s", conn) - if err != nil { - log.Event(ctx, "swarmDialBackoffAdd", logdial) - s.backf.AddBackoff(p) // let others know to backoff - - // ok, we failed. try again. (if loop is done, our error is output) - return nil, fmt.Errorf("dial attempt failed: %s", err) - } - log.Event(ctx, "swarmDialBackoffClear", logdial) - s.backf.Clear(p) // okay, no longer need to backoff - return conn, nil -} - -// dial is the actual swarm's dial logic, gated by Dial. -func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { - var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil) - if p == s.local { - log.Event(ctx, "swarmDialDoDialSelf", logdial) - return nil, ErrDialToSelf - } - defer log.EventBegin(ctx, "swarmDialDo", logdial).Done() - logdial["dial"] = "failure" // start off with failure. set to "success" at the end. - - sk := s.peers.PrivKey(s.local) - logdial["encrypted"] = (sk != nil) // log wether this will be an encrypted dial or not. - if sk == nil { - // fine for sk to be nil, just log. - log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") - } - - ila, _ := s.InterfaceListenAddresses() - subtractFilter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...) - - // get live channel of addresses for peer, filtered by the given filters - /* - remoteAddrChan := s.peers.AddrsChan(ctx, p, - addrutil.AddrUsableFilter, - subtractFilter, - s.Filters.AddrBlocked) - */ - - ////// - /* - This code is temporary, the peerstore can currently provide - a channel as an interface for receiving addresses, but more thought - needs to be put into the execution. For now, this allows us to use - the improved rate limiter, while maintaining the outward behaviour - that we previously had (halting a dial when we run out of addrs) - */ - paddrs := s.peers.Addrs(p) - goodAddrs := addrutil.FilterAddrs(paddrs, - addrutil.AddrUsableFunc, - subtractFilter, - addrutil.FilterNeg(s.Filters.AddrBlocked), - ) - remoteAddrChan := make(chan ma.Multiaddr, len(goodAddrs)) - for _, a := range goodAddrs { - remoteAddrChan <- a - } - close(remoteAddrChan) - ///////// - - // try to get a connection to any addr - connC, err := s.dialAddrs(ctx, p, remoteAddrChan) - if err != nil { - logdial["error"] = err - return nil, err - } - logdial["netconn"] = lgbl.NetConn(connC) - - // ok try to setup the new connection. - defer log.EventBegin(ctx, "swarmDialDoSetup", logdial, lgbl.NetConn(connC)).Done() - swarmC, err := dialConnSetup(ctx, s, connC) - if err != nil { - logdial["error"] = err - connC.Close() // close the connection. didn't work out :( - return nil, err - } - - logdial["dial"] = "success" - return swarmC, nil -} - -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (iconn.Conn, error) { - log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() // cancel work when we exit func - - // use a single response type instead of errs and conns, reduces complexity *a ton* - respch := make(chan dialResult) - - defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p) - exitErr := defaultDialFail - - var active int - for { - select { - case addr, ok := <-remoteAddrs: - if !ok { - remoteAddrs = nil - if active == 0 { - return nil, exitErr - } - continue - } - - s.limitedDial(ctx, p, addr, respch) - active++ - case <-ctx.Done(): - if exitErr == defaultDialFail { - exitErr = ctx.Err() - } - return nil, exitErr - case resp := <-respch: - active-- - if resp.Err != nil { - log.Info("got error on dial: ", resp.Err) - // Errors are normal, lots of dials will fail - exitErr = resp.Err - - if remoteAddrs == nil && active == 0 { - return nil, exitErr - } - } else if resp.Conn != nil { - return resp.Conn, nil - } - } - } -} - -// limitedDial will start a dial to the given peer when -// it is able, respecting the various different types of rate -// limiting that occur without using extra goroutines per addr -func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) { - s.limiter.AddDialJob(&dialJob{ - addr: a, - peer: p, - resp: resp, - ctx: ctx, - }) -} - -func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (iconn.Conn, error) { - log.Debugf("%s swarm dialing %s %s", s.local, p, addr) - - connC, err := s.dialer.Dial(ctx, addr, p) - if err != nil { - return nil, fmt.Errorf("%s --> %s dial attempt failed: %s", s.local, p, err) - } - - // if the connection is not to whom we thought it would be... - remotep := connC.RemotePeer() - if remotep != p { - connC.Close() - _, err := connC.Read(nil) // should return any potential errors (ex: from secio) - return nil, fmt.Errorf("misdial to %s through %s (got %s): %s", p, addr, remotep, err) - } - - // if the connection is to ourselves... - // this can happen TONS when Loopback addrs are advertized. - // (this should be caught by two checks above, but let's just make sure.) - if remotep == s.local { - connC.Close() - return nil, fmt.Errorf("misdial to %s through %s (got self)", p, addr) - } - - // success! we got one! - return connC, nil -} - -var ConnSetupTimeout = time.Minute * 5 - -// dialConnSetup is the setup logic for a connection from the dial side. it -// needs to add the Conn to the StreamSwarm, then run newConnSetup -func dialConnSetup(ctx context.Context, s *Swarm, connC iconn.Conn) (*Conn, error) { - - deadline, ok := ctx.Deadline() - if !ok { - deadline = time.Now().Add(ConnSetupTimeout) - } - - if err := connC.SetDeadline(deadline); err != nil { - return nil, err - } - - psC, err := s.swarm.AddConn(connC) - if err != nil { - // connC is closed by caller if we fail. - return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err) - } - - // ok try to setup the new connection. (newConnSetup will add to group) - swarmC, err := s.newConnSetup(ctx, psC) - if err != nil { - psC.Close() // we need to make sure psC is Closed. - return nil, err - } - - if err := connC.SetDeadline(time.Time{}); err != nil { - log.Error("failed to reset connection deadline after setup: ", err) - return nil, err - } - - return swarmC, err -} diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go deleted file mode 100644 index d76d1e640efc7f9dec2c189b1d9e949f4eea748d..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_listen.go +++ /dev/null @@ -1,170 +0,0 @@ -package swarm - -import ( - "context" - "fmt" - - ps "github.com/jbenet/go-peerstream" - conn "github.com/libp2p/go-libp2p-conn" - iconn "github.com/libp2p/go-libp2p-interface-conn" - lgbl "github.com/libp2p/go-libp2p-loggables" - mconn "github.com/libp2p/go-libp2p-metrics/conn" - inet "github.com/libp2p/go-libp2p-net" - transport "github.com/libp2p/go-libp2p-transport" - ma "github.com/multiformats/go-multiaddr" -) - -func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { - tpt := s.transportForAddr(a) - if tpt == nil { - return fmt.Errorf("no transport for address: %s", a) - } - - d, err := tpt.Dialer(a, transport.TimeoutOpt(DialTimeout), transport.ReusePorts) - if err != nil { - return err - } - - s.dialer.AddDialer(d) - - list, err := tpt.Listen(a) - if err != nil { - return err - } - - err = s.addListener(list) - if err != nil { - return err - } - - return nil -} - -// Open listeners and reuse-dialers for the given addresses -func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error { - errs := make([]error, len(addrs)) - var succeeded int - for i, a := range addrs { - if err := s.AddListenAddr(a); err != nil { - errs[i] = err - } else { - succeeded++ - } - } - - for i, e := range errs { - if e != nil { - log.Warning("listen on %s failed: %s", addrs[i], errs[i]) - } - } - - if succeeded == 0 && len(addrs) > 0 { - return fmt.Errorf("failed to listen on any addresses: %s", errs) - } - - return nil -} - -func (s *Swarm) transportForAddr(a ma.Multiaddr) transport.Transport { - for _, t := range s.transports { - if t.Matches(a) { - return t - } - } - - return nil -} - -func (s *Swarm) addListener(tptlist transport.Listener) error { - - sk := s.peers.PrivKey(s.local) - if sk == nil { - // may be fine for sk to be nil, just log a warning. - log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") - } - - list, err := conn.WrapTransportListener(s.Context(), tptlist, s.local, sk) - if err != nil { - return err - } - - list.SetAddrFilters(s.Filters) - - if cw, ok := list.(conn.ListenerConnWrapper); ok && s.bwc != nil { - cw.SetConnWrapper(func(c transport.Conn) transport.Conn { - return mconn.WrapConn(s.bwc, c) - }) - } - - return s.addConnListener(list) -} - -func (s *Swarm) addConnListener(list iconn.Listener) error { - // AddListener to the peerstream Listener. this will begin accepting connections - // and streams! - sl, err := s.swarm.AddListener(list) - if err != nil { - return err - } - log.Debugf("Swarm Listeners at %s", s.ListenAddresses()) - - maddr := list.Multiaddr() - - // signal to our notifiees on successful conn. - s.notifyAll(func(n inet.Notifiee) { - n.Listen((*Network)(s), maddr) - }) - - // go consume peerstream's listen accept errors. note, these ARE errors. - // they may be killing the listener, and if we get _any_ we should be - // fixing this in our conn.Listener (to ignore them or handle them - // differently.) - go func(ctx context.Context, sl *ps.Listener) { - - // signal to our notifiees closing - defer s.notifyAll(func(n inet.Notifiee) { - n.ListenClose((*Network)(s), maddr) - }) - - for { - select { - case err, more := <-sl.AcceptErrors(): - if !more { - return - } - log.Warningf("swarm listener accept error: %s", err) - case <-ctx.Done(): - return - } - } - }(s.Context(), sl) - - return nil -} - -// connHandler is called by the StreamSwarm whenever a new connection is added -// here we configure it slightly. Note that this is sequential, so if anything -// will take a while do it in a goroutine. -// See https://godoc.org/github.com/jbenet/go-peerstream for more information -func (s *Swarm) connHandler(c *ps.Conn) *Conn { - ctx := context.Background() - // this context is for running the handshake, which -- when receiveing connections - // -- we have no bound on beyond what the transport protocol bounds it at. - // note that setup + the handshake are bounded by underlying io. - // (i.e. if TCP or UDP disconnects (or the swarm closes), we're done. - // Q: why not have a shorter handshake? think about an HTTP server on really slow conns. - // as long as the conn is live (TCP says its online), it tries its best. we follow suit.) - - sc, err := s.newConnSetup(ctx, c) - if err != nil { - log.Debug(err) - log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err)) - c.Close() // boom. close it. - return nil - } - - // if a peer dials us, remove from dial backoff. - s.backf.Clear(sc.RemotePeer()) - - return sc -} diff --git a/p2p/net/swarm/swarm_net.go b/p2p/net/swarm/swarm_net.go deleted file mode 100644 index 90356a0fc7e37ddf3daa5a58be0d53d1bd3fc08c..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_net.go +++ /dev/null @@ -1,170 +0,0 @@ -package swarm - -import ( - "context" - "fmt" - - "github.com/jbenet/goprocess" - metrics "github.com/libp2p/go-libp2p-metrics" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" - ma "github.com/multiformats/go-multiaddr" -) - -// Network implements the inet.Network interface. -// It is simply a swarm, with a few different functions -// to implement inet.Network. -type Network Swarm - -// NewNetwork constructs a new network and starts listening on given addresses. -func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID, - peers pstore.Peerstore, bwc metrics.Reporter) (*Network, error) { - - s, err := NewSwarm(ctx, listen, local, peers, bwc) - if err != nil { - return nil, err - } - - return (*Network)(s), nil -} - -// DialPeer attempts to establish a connection to a given peer. -// Respects the context. -func (n *Network) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) { - log.Debugf("[%s] network dialing peer [%s]", n.local, p) - sc, err := n.Swarm().Dial(ctx, p) - if err != nil { - return nil, err - } - - log.Debugf("network for %s finished dialing %s", n.local, p) - return inet.Conn(sc), nil -} - -// Process returns the network's Process -func (n *Network) Process() goprocess.Process { - return n.proc -} - -// Swarm returns the network's peerstream.Swarm -func (n *Network) Swarm() *Swarm { - return (*Swarm)(n) -} - -// LocalPeer the network's LocalPeer -func (n *Network) LocalPeer() peer.ID { - return n.Swarm().LocalPeer() -} - -// Peers returns the known peer IDs from the Peerstore -func (n *Network) Peers() []peer.ID { - return n.Swarm().Peers() -} - -// Peerstore returns the Peerstore, which tracks known peers -func (n *Network) Peerstore() pstore.Peerstore { - return n.Swarm().peers -} - -// Conns returns the connected peers -func (n *Network) Conns() []inet.Conn { - conns1 := n.Swarm().Connections() - out := make([]inet.Conn, len(conns1)) - for i, c := range conns1 { - out[i] = inet.Conn(c) - } - return out -} - -// ConnsToPeer returns the connections in this Netowrk for given peer. -func (n *Network) ConnsToPeer(p peer.ID) []inet.Conn { - conns1 := n.Swarm().ConnectionsToPeer(p) - out := make([]inet.Conn, len(conns1)) - for i, c := range conns1 { - out[i] = inet.Conn(c) - } - return out -} - -// ClosePeer connection to peer -func (n *Network) ClosePeer(p peer.ID) error { - return n.Swarm().CloseConnection(p) -} - -// close is the real teardown function -func (n *Network) close() error { - return n.Swarm().Close() -} - -// Close calls the ContextCloser func -func (n *Network) Close() error { - return n.Swarm().proc.Close() -} - -// Listen tells the network to start listening on given multiaddrs. -func (n *Network) Listen(addrs ...ma.Multiaddr) error { - return n.Swarm().Listen(addrs...) -} - -// ListenAddresses returns a list of addresses at which this network listens. -func (n *Network) ListenAddresses() []ma.Multiaddr { - return n.Swarm().ListenAddresses() -} - -// InterfaceListenAddresses returns a list of addresses at which this network -// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to -// use the known local interfaces. -func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) { - return n.Swarm().InterfaceListenAddresses() -} - -// Connectedness returns a state signaling connection capabilities -// For now only returns Connected || NotConnected. Expand into more later. -func (n *Network) Connectedness(p peer.ID) inet.Connectedness { - if n.Swarm().HaveConnsToPeer(p) { - return inet.Connected - } - return inet.NotConnected -} - -// NewStream returns a new stream to given peer p. -// If there is no connection to p, attempts to create one. -func (n *Network) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) { - log.Debugf("[%s] network opening stream to peer [%s]", n.local, p) - s, err := n.Swarm().NewStreamWithPeer(ctx, p) - if err != nil { - return nil, err - } - - return inet.Stream(s), nil -} - -// SetStreamHandler sets the protocol handler on the Network's Muxer. -// This operation is threadsafe. -func (n *Network) SetStreamHandler(h inet.StreamHandler) { - n.Swarm().SetStreamHandler(h) -} - -// SetConnHandler sets the conn handler on the Network. -// This operation is threadsafe. -func (n *Network) SetConnHandler(h inet.ConnHandler) { - n.Swarm().SetConnHandler(func(c *Conn) { - h(inet.Conn(c)) - }) -} - -// String returns a string representation of Network. -func (n *Network) String() string { - return fmt.Sprintf("", n.LocalPeer()) -} - -// Notify signs up Notifiee to receive signals when events happen -func (n *Network) Notify(f inet.Notifiee) { - n.Swarm().Notify(f) -} - -// StopNotify unregisters Notifiee fromr receiving signals -func (n *Network) StopNotify(f inet.Notifiee) { - n.Swarm().StopNotify(f) -} diff --git a/p2p/net/swarm/swarm_net_test.go b/p2p/net/swarm/swarm_net_test.go deleted file mode 100644 index ebf3bb82a22d34d5a0f661782d16d87bc32df3e7..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_net_test.go +++ /dev/null @@ -1,167 +0,0 @@ -package swarm_test - -import ( - "fmt" - "testing" - "time" - - "context" - inet "github.com/libp2p/go-libp2p-net" - testutil "github.com/libp2p/go-libp2p/p2p/test/util" -) - -// TestConnectednessCorrect starts a few networks, connects a few -// and tests Connectedness value is correct. -func TestConnectednessCorrect(t *testing.T) { - - ctx := context.Background() - - nets := make([]inet.Network, 4) - for i := 0; i < 4; i++ { - nets[i] = testutil.GenSwarmNetwork(t, ctx) - } - - // connect 0-1, 0-2, 0-3, 1-2, 2-3 - - dial := func(a, b inet.Network) { - testutil.DivulgeAddresses(b, a) - if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil { - t.Fatalf("Failed to dial: %s", err) - } - } - - dial(nets[0], nets[1]) - dial(nets[0], nets[3]) - dial(nets[1], nets[2]) - dial(nets[3], nets[2]) - - // The notifications for new connections get sent out asynchronously. - // There is the potential for a race condition here, so we sleep to ensure - // that they have been received. - time.Sleep(time.Millisecond * 100) - - // test those connected show up correctly - - // test connected - expectConnectedness(t, nets[0], nets[1], inet.Connected) - expectConnectedness(t, nets[0], nets[3], inet.Connected) - expectConnectedness(t, nets[1], nets[2], inet.Connected) - expectConnectedness(t, nets[3], nets[2], inet.Connected) - - // test not connected - expectConnectedness(t, nets[0], nets[2], inet.NotConnected) - expectConnectedness(t, nets[1], nets[3], inet.NotConnected) - - if len(nets[0].Peers()) != 2 { - t.Fatal("expected net 0 to have two peers") - } - - if len(nets[2].Conns()) != 2 { - t.Fatal("expected net 2 to have two conns") - } - - if len(nets[1].ConnsToPeer(nets[3].LocalPeer())) != 0 { - t.Fatal("net 1 should have no connections to net 3") - } - - if err := nets[2].ClosePeer(nets[1].LocalPeer()); err != nil { - t.Fatal(err) - } - - time.Sleep(time.Millisecond * 50) - - expectConnectedness(t, nets[2], nets[1], inet.NotConnected) - - for _, n := range nets { - n.Close() - } - - for _, n := range nets { - <-n.Process().Closed() - } -} - -func expectConnectedness(t *testing.T, a, b inet.Network, expected inet.Connectedness) { - es := "%s is connected to %s, but Connectedness incorrect. %s %s %s" - atob := a.Connectedness(b.LocalPeer()) - btoa := b.Connectedness(a.LocalPeer()) - if atob != expected { - t.Errorf(es, a, b, printConns(a), printConns(b), atob) - } - - // test symmetric case - if btoa != expected { - t.Errorf(es, b, a, printConns(b), printConns(a), btoa) - } -} - -func printConns(n inet.Network) string { - s := fmt.Sprintf("Connections in %s:\n", n) - for _, c := range n.Conns() { - s = s + fmt.Sprintf("- %s\n", c) - } - return s -} - -func TestNetworkOpenStream(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - nets := make([]inet.Network, 4) - for i := 0; i < 4; i++ { - nets[i] = testutil.GenSwarmNetwork(t, ctx) - } - - dial := func(a, b inet.Network) { - testutil.DivulgeAddresses(b, a) - if _, err := a.DialPeer(ctx, b.LocalPeer()); err != nil { - t.Fatalf("Failed to dial: %s", err) - } - } - - dial(nets[0], nets[1]) - dial(nets[0], nets[3]) - dial(nets[1], nets[2]) - - done := make(chan bool) - nets[1].SetStreamHandler(func(s inet.Stream) { - defer close(done) - defer s.Close() - - buf := make([]byte, 10) - _, err := s.Read(buf) - if err != nil { - t.Error(err) - return - } - if string(buf) != "hello ipfs" { - t.Error("got wrong message") - } - }) - - s, err := nets[0].NewStream(ctx, nets[1].LocalPeer()) - if err != nil { - t.Fatal(err) - } - - _, err = s.Write([]byte("hello ipfs")) - if err != nil { - t.Fatal(err) - } - - err = s.Close() - if err != nil { - t.Fatal(err) - } - - select { - case <-done: - case <-time.After(time.Millisecond * 100): - t.Fatal("timed out waiting on stream") - } - - _, err = nets[1].NewStream(ctx, nets[3].LocalPeer()) - if err == nil { - t.Fatal("expected stream open 1->3 to fail") - } -} diff --git a/p2p/net/swarm/swarm_notif_test.go b/p2p/net/swarm/swarm_notif_test.go deleted file mode 100644 index d9e9df62a59ba47b9445812814b378813c9e9054..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_notif_test.go +++ /dev/null @@ -1,215 +0,0 @@ -package swarm - -import ( - "testing" - "time" - - "context" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - ma "github.com/multiformats/go-multiaddr" -) - -func streamsSame(a, b inet.Stream) bool { - sa := a.(*Stream) - sb := b.(*Stream) - return sa.Stream() == sb.Stream() -} - -func TestNotifications(t *testing.T) { - ctx := context.Background() - swarms := makeSwarms(ctx, t, 5) - defer func() { - for _, s := range swarms { - s.Close() - } - }() - - timeout := 5 * time.Second - - // signup notifs - notifiees := make([]*netNotifiee, len(swarms)) - for i, swarm := range swarms { - n := newNetNotifiee() - swarm.Notify(n) - notifiees[i] = n - } - - connectSwarms(t, ctx, swarms) - - <-time.After(time.Millisecond) - // should've gotten 5 by now. - - // test everyone got the correct connection opened calls - for i, s := range swarms { - n := notifiees[i] - notifs := make(map[peer.ID][]inet.Conn) - for j, s2 := range swarms { - if i == j { - continue - } - - // this feels a little sketchy, but its probably okay - for len(s.ConnectionsToPeer(s2.LocalPeer())) != len(notifs[s2.LocalPeer()]) { - select { - case c := <-n.connected: - nfp := notifs[c.RemotePeer()] - notifs[c.RemotePeer()] = append(nfp, c) - case <-time.After(timeout): - t.Fatal("timeout") - } - } - } - - for p, cons := range notifs { - expect := s.ConnectionsToPeer(p) - if len(expect) != len(cons) { - t.Fatal("got different number of connections") - } - - for _, c := range cons { - var found bool - for _, c2 := range expect { - if c == c2 { - found = true - break - } - } - - if !found { - t.Fatal("connection not found!") - } - } - } - } - - complement := func(c inet.Conn) (*Swarm, *netNotifiee, *Conn) { - for i, s := range swarms { - for _, c2 := range s.Connections() { - if c.LocalMultiaddr().Equal(c2.RemoteMultiaddr()) && - c2.LocalMultiaddr().Equal(c.RemoteMultiaddr()) { - return s, notifiees[i], c2 - } - } - } - t.Fatal("complementary conn not found", c) - return nil, nil, nil - } - - testOCStream := func(n *netNotifiee, s inet.Stream) { - var s2 inet.Stream - select { - case s2 = <-n.openedStream: - t.Log("got notif for opened stream") - case <-time.After(timeout): - t.Fatal("timeout") - } - if !streamsSame(s, s2) { - t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) - } - - select { - case s2 = <-n.closedStream: - t.Log("got notif for closed stream") - case <-time.After(timeout): - t.Fatal("timeout") - } - if !streamsSame(s, s2) { - t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) - } - } - - streams := make(chan inet.Stream) - for _, s := range swarms { - s.SetStreamHandler(func(s inet.Stream) { - streams <- s - s.Close() - }) - } - - // open a streams in each conn - for i, s := range swarms { - for _, c := range s.Connections() { - _, n2, _ := complement(c) - - st1, err := c.NewStream() - if err != nil { - t.Error(err) - } else { - st1.Write([]byte("hello")) - st1.Close() - testOCStream(notifiees[i], st1) - st2 := <-streams - testOCStream(n2, st2) - } - } - } - - // close conns - for i, s := range swarms { - n := notifiees[i] - for _, c := range s.Connections() { - _, n2, c2 := complement(c) - c.Close() - c2.Close() - - var c3, c4 inet.Conn - select { - case c3 = <-n.disconnected: - case <-time.After(timeout): - t.Fatal("timeout") - } - if c != c3 { - t.Fatal("got incorrect conn", c, c3) - } - - select { - case c4 = <-n2.disconnected: - case <-time.After(timeout): - t.Fatal("timeout") - } - if c2 != c4 { - t.Fatal("got incorrect conn", c, c2) - } - } - } -} - -type netNotifiee struct { - listen chan ma.Multiaddr - listenClose chan ma.Multiaddr - connected chan inet.Conn - disconnected chan inet.Conn - openedStream chan inet.Stream - closedStream chan inet.Stream -} - -func newNetNotifiee() *netNotifiee { - return &netNotifiee{ - listen: make(chan ma.Multiaddr), - listenClose: make(chan ma.Multiaddr), - connected: make(chan inet.Conn), - disconnected: make(chan inet.Conn), - openedStream: make(chan inet.Stream), - closedStream: make(chan inet.Stream), - } -} - -func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) { - nn.listen <- a -} -func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) { - nn.listenClose <- a -} -func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { - nn.connected <- v -} -func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { - nn.disconnected <- v -} -func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) { - nn.openedStream <- v -} -func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) { - nn.closedStream <- v -} diff --git a/p2p/net/swarm/swarm_stream.go b/p2p/net/swarm/swarm_stream.go deleted file mode 100644 index ef3ab3c8fd98d65a89c8031ec95bc09e28daccae..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_stream.go +++ /dev/null @@ -1,68 +0,0 @@ -package swarm - -import ( - inet "github.com/libp2p/go-libp2p-net" - protocol "github.com/libp2p/go-libp2p-protocol" - - ps "github.com/jbenet/go-peerstream" -) - -// Stream is a wrapper around a ps.Stream that exposes a way to get -// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm) -type Stream struct { - stream *ps.Stream - protocol protocol.ID -} - -// Stream returns the underlying peerstream.Stream -func (s *Stream) Stream() *ps.Stream { - return s.stream -} - -// Conn returns the Conn associated with this Stream, as an inet.Conn -func (s *Stream) Conn() inet.Conn { - return s.SwarmConn() -} - -// SwarmConn returns the Conn associated with this Stream, as a *Conn -func (s *Stream) SwarmConn() *Conn { - return (*Conn)(s.stream.Conn()) -} - -// Read reads bytes from a stream. -func (s *Stream) Read(p []byte) (n int, err error) { - return s.stream.Read(p) -} - -// Write writes bytes to a stream, flushing for each call. -func (s *Stream) Write(p []byte) (n int, err error) { - return s.stream.Write(p) -} - -// Close closes the stream, indicating this side is finished -// with the stream. -func (s *Stream) Close() error { - return s.stream.Close() -} - -func (s *Stream) Protocol() protocol.ID { - return s.protocol -} - -func (s *Stream) SetProtocol(p protocol.ID) { - s.protocol = p -} - -func wrapStream(pss *ps.Stream) *Stream { - return &Stream{ - stream: pss, - } -} - -func wrapStreams(st []*ps.Stream) []*Stream { - out := make([]*Stream, len(st)) - for i, s := range st { - out[i] = wrapStream(s) - } - return out -} diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go deleted file mode 100644 index 9deb918f3cb2a329e990095cfced00674c12eb31..0000000000000000000000000000000000000000 --- a/p2p/net/swarm/swarm_test.go +++ /dev/null @@ -1,356 +0,0 @@ -package swarm - -import ( - "bytes" - "context" - "fmt" - "io" - "net" - "sync" - "testing" - "time" - - metrics "github.com/libp2p/go-libp2p-metrics" - inet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - pstore "github.com/libp2p/go-libp2p-peerstore" - testutil "github.com/libp2p/go-testutil" - ma "github.com/multiformats/go-multiaddr" -) - -func EchoStreamHandler(stream inet.Stream) { - go func() { - defer stream.Close() - - // pull out the ipfs conn - c := stream.Conn() - log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) - - buf := make([]byte, 4) - - for { - if _, err := stream.Read(buf); err != nil { - if err != io.EOF { - log.Error("ping receive error:", err) - } - return - } - - if !bytes.Equal(buf, []byte("ping")) { - log.Errorf("ping receive error: ping != %s %v", buf, buf) - return - } - - if _, err := stream.Write([]byte("pong")); err != nil { - log.Error("pond send error:", err) - return - } - } - }() -} - -func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm { - id := testutil.RandIdentityOrFatal(t) - - peerstore := pstore.NewPeerstore() - peerstore.AddPubKey(id.ID(), id.PublicKey()) - peerstore.AddPrivKey(id.ID(), id.PrivateKey()) - - swarm, err := NewSwarm(ctx, nil, id.ID(), peerstore, metrics.NewBandwidthCounter()) - if err != nil { - t.Fatal(err) - } - - swarm.SetStreamHandler(EchoStreamHandler) - - return swarm -} - -func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm { - swarms := make([]*Swarm, 0, num) - - for i := 0; i < num; i++ { - localnp := testutil.RandPeerNetParamsOrFatal(t) - - peerstore := pstore.NewPeerstore() - peerstore.AddPubKey(localnp.ID, localnp.PubKey) - peerstore.AddPrivKey(localnp.ID, localnp.PrivKey) - - addrs := []ma.Multiaddr{localnp.Addr} - swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore, metrics.NewBandwidthCounter()) - if err != nil { - t.Fatal(err) - } - - swarm.SetStreamHandler(EchoStreamHandler) - swarms = append(swarms, swarm) - } - - return swarms -} - -func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm) { - - var wg sync.WaitGroup - connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) { - // TODO: make a DialAddr func. - s.peers.AddAddr(dst, addr, pstore.PermanentAddrTTL) - if _, err := s.Dial(ctx, dst); err != nil { - t.Fatal("error swarm dialing to peer", err) - } - wg.Done() - } - - log.Info("Connecting swarms simultaneously.") - for _, s1 := range swarms { - for _, s2 := range swarms { - if s2.local != s1.local { // don't connect to self. - wg.Add(1) - connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0]) // try the first. - } - } - } - wg.Wait() - - for _, s := range swarms { - log.Infof("%s swarm routing table: %s", s.local, s.Peers()) - } -} - -func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) { - // t.Skip("skipping for another test") - - ctx := context.Background() - swarms := makeSwarms(ctx, t, SwarmNum) - - // connect everyone - connectSwarms(t, ctx, swarms) - - // ping/pong - for _, s1 := range swarms { - log.Debugf("-------------------------------------------------------") - log.Debugf("%s ping pong round", s1.local) - log.Debugf("-------------------------------------------------------") - - _, cancel := context.WithCancel(ctx) - got := map[peer.ID]int{} - errChan := make(chan error, MsgNum*len(swarms)) - streamChan := make(chan *Stream, MsgNum) - - // send out "ping" x MsgNum to every peer - go func() { - defer close(streamChan) - - var wg sync.WaitGroup - send := func(p peer.ID) { - defer wg.Done() - - // first, one stream per peer (nice) - stream, err := s1.NewStreamWithPeer(ctx, p) - if err != nil { - errChan <- err - return - } - - // send out ping! - for k := 0; k < MsgNum; k++ { // with k messages - msg := "ping" - log.Debugf("%s %s %s (%d)", s1.local, msg, p, k) - if _, err := stream.Write([]byte(msg)); err != nil { - errChan <- err - continue - } - } - - // read it later - streamChan <- stream - } - - for _, s2 := range swarms { - if s2.local == s1.local { - continue // dont send to self... - } - - wg.Add(1) - go send(s2.local) - } - wg.Wait() - }() - - // receive "pong" x MsgNum from every peer - go func() { - defer close(errChan) - count := 0 - countShouldBe := MsgNum * (len(swarms) - 1) - for stream := range streamChan { // one per peer - defer stream.Close() - - // get peer on the other side - p := stream.Conn().RemotePeer() - - // receive pings - msgCount := 0 - msg := make([]byte, 4) - for k := 0; k < MsgNum; k++ { // with k messages - - // read from the stream - if _, err := stream.Read(msg); err != nil { - errChan <- err - continue - } - - if string(msg) != "pong" { - errChan <- fmt.Errorf("unexpected message: %s", msg) - continue - } - - log.Debugf("%s %s %s (%d)", s1.local, msg, p, k) - msgCount++ - } - - got[p] = msgCount - count += msgCount - } - - if count != countShouldBe { - errChan <- fmt.Errorf("count mismatch: %d != %d", count, countShouldBe) - } - }() - - // check any errors (blocks till consumer is done) - for err := range errChan { - if err != nil { - t.Error(err.Error()) - } - } - - log.Debugf("%s got pongs", s1.local) - if (len(swarms) - 1) != len(got) { - t.Errorf("got (%d) less messages than sent (%d).", len(got), len(swarms)) - } - - for p, n := range got { - if n != MsgNum { - t.Error("peer did not get all msgs", p, n, "/", MsgNum) - } - } - - cancel() - <-time.After(10 * time.Millisecond) - } - - for _, s := range swarms { - s.Close() - } -} - -func TestSwarm(t *testing.T) { - // t.Skip("skipping for another test") - t.Parallel() - - // msgs := 1000 - msgs := 100 - swarms := 5 - SubtestSwarm(t, swarms, msgs) -} - -func TestBasicSwarm(t *testing.T) { - // t.Skip("skipping for another test") - t.Parallel() - - msgs := 1 - swarms := 2 - SubtestSwarm(t, swarms, msgs) -} - -func TestConnHandler(t *testing.T) { - // t.Skip("skipping for another test") - t.Parallel() - - ctx := context.Background() - swarms := makeSwarms(ctx, t, 5) - - gotconn := make(chan struct{}, 10) - swarms[0].SetConnHandler(func(conn *Conn) { - gotconn <- struct{}{} - }) - - connectSwarms(t, ctx, swarms) - - <-time.After(time.Millisecond) - // should've gotten 5 by now. - - swarms[0].SetConnHandler(nil) - - expect := 4 - for i := 0; i < expect; i++ { - select { - case <-time.After(time.Second): - t.Fatal("failed to get connections") - case <-gotconn: - } - } - - select { - case <-gotconn: - t.Fatalf("should have connected to %d swarms, got an extra.", expect) - default: - } -} - -func TestAddrBlocking(t *testing.T) { - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - - swarms[0].SetConnHandler(func(conn *Conn) { - t.Errorf("no connections should happen! -- %s", conn) - }) - - _, block, err := net.ParseCIDR("127.0.0.1/8") - if err != nil { - t.Fatal(err) - } - - swarms[1].Filters.AddDialFilter(block) - - swarms[1].peers.AddAddr(swarms[0].LocalPeer(), swarms[0].ListenAddresses()[0], pstore.PermanentAddrTTL) - _, err = swarms[1].Dial(ctx, swarms[0].LocalPeer()) - if err == nil { - t.Fatal("dial should have failed") - } - - swarms[0].peers.AddAddr(swarms[1].LocalPeer(), swarms[1].ListenAddresses()[0], pstore.PermanentAddrTTL) - _, err = swarms[0].Dial(ctx, swarms[1].LocalPeer()) - if err == nil { - t.Fatal("dial should have failed") - } -} - -func TestFilterBounds(t *testing.T) { - ctx := context.Background() - swarms := makeSwarms(ctx, t, 2) - - conns := make(chan struct{}, 8) - swarms[0].SetConnHandler(func(conn *Conn) { - conns <- struct{}{} - }) - - // Address that we wont be dialing from - _, block, err := net.ParseCIDR("192.0.0.1/8") - if err != nil { - t.Fatal(err) - } - - // set filter on both sides, shouldnt matter - swarms[1].Filters.AddDialFilter(block) - swarms[0].Filters.AddDialFilter(block) - - connectSwarms(t, ctx, swarms) - - select { - case <-time.After(time.Second): - t.Fatal("should have gotten connection") - case <-conns: - t.Log("got connect") - } -} diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go index e3249f57b197cec2ff0eb6c58aa1b15c921365e6..b72081ef356562429749ed3e41f96a77e9baab8b 100644 --- a/p2p/test/reconnects/reconnect_test.go +++ b/p2p/test/reconnects/reconnect_test.go @@ -14,7 +14,7 @@ import ( host "github.com/libp2p/go-libp2p-host" inet "github.com/libp2p/go-libp2p-net" protocol "github.com/libp2p/go-libp2p-protocol" - swarm "github.com/libp2p/go-libp2p/p2p/net/swarm" + swarm "github.com/libp2p/go-libp2p-swarm" testutil "github.com/libp2p/go-libp2p/p2p/test/util" ) diff --git a/p2p/test/util/util.go b/p2p/test/util/util.go index b02e989b698a93b9f0715064d020add77212507c..efe3d0d082f51eb02fe925304e9173acce5e8aa2 100644 --- a/p2p/test/util/util.go +++ b/p2p/test/util/util.go @@ -5,11 +5,11 @@ import ( "testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - swarm "github.com/libp2p/go-libp2p/p2p/net/swarm" metrics "github.com/libp2p/go-libp2p-metrics" inet "github.com/libp2p/go-libp2p-net" pstore "github.com/libp2p/go-libp2p-peerstore" + swarm "github.com/libp2p/go-libp2p-swarm" tu "github.com/libp2p/go-testutil" ma "github.com/multiformats/go-multiaddr" ) diff --git a/package.json b/package.json index 68236ed69a58cafcaa7c9964e12f0d3995ffe69d..bee3d6b28b5126296495f209a9fc9b89b5f532dc 100644 --- a/package.json +++ b/package.json @@ -242,6 +242,12 @@ "hash": "QmdML3R42PRSwnt46jSuEts9bHSqLctVYEjJqMR3UYV8ki", "name": "go-libp2p-host", "version": "1.1.0" + }, + { + "author": "whyrusleeping", + "hash": "QmeAfPWBWDQq9qjQ5oiWhaFs7oEsfB6FyEj5VxNdc2r34q", + "name": "go-libp2p-swarm", + "version": "1.0.0" } ], "gxVersion": "0.4.0",