From b6f19a5591fe987f8296cc6dd321447eeb87c0e7 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 6 May 2016 12:39:08 -0700 Subject: [PATCH] don't execute cancelled jobs --- p2p/net/swarm/limiter.go | 13 +++++++ p2p/net/swarm/limiter_test.go | 64 +++++++++++++++++++++++++++++++---- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go index fe8162d..7835fe5 100644 --- a/p2p/net/swarm/limiter.go +++ b/p2p/net/swarm/limiter.go @@ -24,6 +24,15 @@ type dialJob struct { success bool } +func (dj *dialJob) cancelled() bool { + select { + case <-dj.ctx.Done(): + return true + default: + return false + } +} + type dialLimiter struct { rllock sync.Mutex fdConsuming int @@ -116,6 +125,10 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { // it held during the dial. func (dl *dialLimiter) executeDial(j *dialJob) { defer dl.finishedDial(j) + if j.cancelled() { + return + } + con, err := dl.dialFunc(j.ctx, j.peer, j.addr) select { case j.resp <- dialResult{Conn: con, Err: err}: diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go index b5bbde8..28733c5 100644 --- a/p2p/net/swarm/limiter_test.go +++ b/p2p/net/swarm/limiter_test.go @@ -2,6 +2,7 @@ package swarm import ( "fmt" + "math/rand" "strconv" "testing" "time" @@ -75,13 +76,7 @@ func TestLimiterBasicDials(t *testing.T) { l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4) - bads := []ma.Multiaddr{ - addrWithPort(t, 1), - addrWithPort(t, 2), - addrWithPort(t, 3), - addrWithPort(t, 4), - } - + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} good := addrWithPort(t, 20) resch := make(chan dialResult) @@ -162,6 +157,7 @@ func TestFDLimiting(t *testing.T) { pid5 := peer.ID("testpeer5") utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") + // This should complete immediately since utp addresses arent blocked by fd rate limiting l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) select { @@ -263,3 +259,57 @@ func TestTokenRedistribution(t *testing.T) { t.Fatal("should have gotten successful dial") } } + +func TestStressLimiter(t *testing.T) { + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if tcpPortOver(a, 1000) { + return conn.Conn(nil), nil + } else { + time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100))) + return nil, fmt.Errorf("test bad dial") + } + } + + l := newDialLimiterWithParams(df, 20, 5) + + var bads []ma.Multiaddr + for i := 0; i < 100; i++ { + bads = append(bads, addrWithPort(t, i)) + } + + addresses := append(bads, addrWithPort(t, 2000)) + success := make(chan struct{}) + + for i := 0; i < 20; i++ { + go func(id peer.ID) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp := make(chan dialResult) + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + for _, i := range rand.Perm(len(addresses)) { + l.AddDialJob(&dialJob{ + addr: addresses[i], + ctx: ctx, + peer: id, + resp: resp, + }) + } + + for res := range resp { + if res.Err == nil { + success <- struct{}{} + return + } + } + }(peer.ID(fmt.Sprintf("testpeer%d", i))) + } + + for i := 0; i < 20; i++ { + select { + case <-success: + case <-time.After(time.Second * 5): + t.Fatal("expected a success within five seconds") + } + } +} -- GitLab