Commit b6f19a55 authored by Jeromy's avatar Jeromy
Browse files

don't execute cancelled jobs

parent d899b07f
...@@ -24,6 +24,15 @@ type dialJob struct { ...@@ -24,6 +24,15 @@ type dialJob struct {
success bool success bool
} }
func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}
type dialLimiter struct { type dialLimiter struct {
rllock sync.Mutex rllock sync.Mutex
fdConsuming int fdConsuming int
...@@ -116,6 +125,10 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { ...@@ -116,6 +125,10 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
// it held during the dial. // it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) { func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j) defer dl.finishedDial(j)
if j.cancelled() {
return
}
con, err := dl.dialFunc(j.ctx, j.peer, j.addr) con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select { select {
case j.resp <- dialResult{Conn: con, Err: err}: case j.resp <- dialResult{Conn: con, Err: err}:
......
...@@ -2,6 +2,7 @@ package swarm ...@@ -2,6 +2,7 @@ package swarm
import ( import (
"fmt" "fmt"
"math/rand"
"strconv" "strconv"
"testing" "testing"
"time" "time"
...@@ -75,13 +76,7 @@ func TestLimiterBasicDials(t *testing.T) { ...@@ -75,13 +76,7 @@ func TestLimiterBasicDials(t *testing.T) {
l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4) l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4)
bads := []ma.Multiaddr{ bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
addrWithPort(t, 1),
addrWithPort(t, 2),
addrWithPort(t, 3),
addrWithPort(t, 4),
}
good := addrWithPort(t, 20) good := addrWithPort(t, 20)
resch := make(chan dialResult) resch := make(chan dialResult)
...@@ -162,6 +157,7 @@ func TestFDLimiting(t *testing.T) { ...@@ -162,6 +157,7 @@ func TestFDLimiting(t *testing.T) {
pid5 := peer.ID("testpeer5") pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")
// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})
select { select {
...@@ -263,3 +259,57 @@ func TestTokenRedistribution(t *testing.T) { ...@@ -263,3 +259,57 @@ func TestTokenRedistribution(t *testing.T) {
t.Fatal("should have gotten successful dial") t.Fatal("should have gotten successful dial")
} }
} }
func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if tcpPortOver(a, 1000) {
return conn.Conn(nil), nil
} else {
time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100)))
return nil, fmt.Errorf("test bad dial")
}
}
l := newDialLimiterWithParams(df, 20, 5)
var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
}
addresses := append(bads, addrWithPort(t, 2000))
success := make(chan struct{})
for i := 0; i < 20; i++ {
go func(id peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp := make(chan dialResult)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
for _, i := range rand.Perm(len(addresses)) {
l.AddDialJob(&dialJob{
addr: addresses[i],
ctx: ctx,
peer: id,
resp: resp,
})
}
for res := range resp {
if res.Err == nil {
success <- struct{}{}
return
}
}
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
}
for i := 0; i < 20; i++ {
select {
case <-success:
case <-time.After(time.Second * 5):
t.Fatal("expected a success within five seconds")
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment