Commit 1879bb06 authored by Juan Benet's avatar Juan Benet
Browse files

Merge pull request #1471 from heems/master

add transport from netsim and bandwidth to mocknet
parents dff9366e 536c183b
...@@ -7,13 +7,12 @@ ...@@ -7,13 +7,12 @@
package mocknet package mocknet
import ( import (
"io"
"time"
ic "github.com/ipfs/go-ipfs/p2p/crypto" ic "github.com/ipfs/go-ipfs/p2p/crypto"
host "github.com/ipfs/go-ipfs/p2p/host" host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
"io"
"time"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
) )
...@@ -59,13 +58,14 @@ type Mocknet interface { ...@@ -59,13 +58,14 @@ type Mocknet interface {
ConnectNets(inet.Network, inet.Network) (inet.Conn, error) ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
DisconnectPeers(peer.ID, peer.ID) error DisconnectPeers(peer.ID, peer.ID) error
DisconnectNets(inet.Network, inet.Network) error DisconnectNets(inet.Network, inet.Network) error
LinkAll() error
} }
// LinkOptions are used to change aspects of the links. // LinkOptions are used to change aspects of the links.
// Sorry but they dont work yet :( // Sorry but they dont work yet :(
type LinkOptions struct { type LinkOptions struct {
Latency time.Duration Latency time.Duration
Bandwidth int // in bytes-per-second Bandwidth float64 // in bytes-per-second
// we can make these values distributions down the road. // we can make these values distributions down the road.
} }
......
package mocknet package mocknet
import ( import (
// "fmt"
"io" "io"
"sync" "sync"
"time"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
...@@ -14,14 +16,17 @@ type link struct { ...@@ -14,14 +16,17 @@ type link struct {
mock *mocknet mock *mocknet
nets []*peernet nets []*peernet
opts LinkOptions opts LinkOptions
ratelimiter *ratelimiter
// this could have addresses on both sides. // this could have addresses on both sides.
sync.RWMutex sync.RWMutex
} }
func newLink(mn *mocknet, opts LinkOptions) *link { func newLink(mn *mocknet, opts LinkOptions) *link {
return &link{mock: mn, opts: opts} l := &link{mock: mn,
opts: opts,
ratelimiter: NewRatelimiter(opts.Bandwidth)}
return l
} }
func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
...@@ -57,8 +62,8 @@ func (l *link) newStreamPair() (*stream, *stream) { ...@@ -57,8 +62,8 @@ func (l *link) newStreamPair() (*stream, *stream) {
r1, w1 := io.Pipe() r1, w1 := io.Pipe()
r2, w2 := io.Pipe() r2, w2 := io.Pipe()
s1 := &stream{Reader: r1, Writer: w2} s1 := NewStream(w2, r1)
s2 := &stream{Reader: r2, Writer: w1} s2 := NewStream(w1, r2)
return s1, s2 return s1, s2
} }
...@@ -86,8 +91,17 @@ func (l *link) Peers() []peer.ID { ...@@ -86,8 +91,17 @@ func (l *link) Peers() []peer.ID {
func (l *link) SetOptions(o LinkOptions) { func (l *link) SetOptions(o LinkOptions) {
l.opts = o l.opts = o
l.ratelimiter.UpdateBandwidth(l.opts.Bandwidth)
} }
func (l *link) Options() LinkOptions { func (l *link) Options() LinkOptions {
return l.opts return l.opts
} }
func (l *link) GetLatency() time.Duration {
return l.opts.Latency
}
func (l *link) RateLimit(dataSize int) time.Duration {
return l.ratelimiter.Limit(dataSize)
}
...@@ -63,7 +63,7 @@ func TestNotifications(t *testing.T) { ...@@ -63,7 +63,7 @@ func TestNotifications(t *testing.T) {
} }
} }
if !found { if !found {
t.Error("connection not found") t.Error("connection not found", c1, len(expect), len(actual))
} }
} }
......
package mocknet package mocknet
import ( import (
"bytes"
"io" "io"
"time"
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
) )
...@@ -11,9 +15,50 @@ type stream struct { ...@@ -11,9 +15,50 @@ type stream struct {
io.Reader io.Reader
io.Writer io.Writer
conn *conn conn *conn
toDeliver chan *transportObject
proc process.Process
}
type transportObject struct {
msg []byte
arrivalTime time.Time
}
func NewStream(w io.Writer, r io.Reader) *stream {
s := &stream{
Reader: r,
Writer: w,
toDeliver: make(chan *transportObject),
}
s.proc = process.WithTeardown(s.teardown)
s.proc.Go(s.transport)
return s
}
// How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
l := s.conn.link
delay := l.GetLatency() + l.RateLimit(len(p))
t := time.Now().Add(delay)
select {
case <-s.proc.Closing(): // bail out if we're closing.
return 0, io.ErrClosedPipe
case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
}
return len(p), nil
} }
func (s *stream) Close() error { func (s *stream) Close() error {
return s.proc.Close()
}
// teardown shuts down the stream. it is called by s.proc.Close()
// after all the children of this s.proc (i.e. transport's proc)
// are done.
func (s *stream) teardown() error {
// at this point, no streams are writing.
s.conn.removeStream(s) s.conn.removeStream(s)
if r, ok := (s.Reader).(io.Closer); ok { if r, ok := (s.Reader).(io.Closer); ok {
r.Close() r.Close()
...@@ -30,3 +75,71 @@ func (s *stream) Close() error { ...@@ -30,3 +75,71 @@ func (s *stream) Close() error {
func (s *stream) Conn() inet.Conn { func (s *stream) Conn() inet.Conn {
return s.conn return s.conn
} }
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
func (s *stream) transport(proc process.Process) {
bufsize := 256
buf := new(bytes.Buffer)
ticker := time.NewTicker(time.Millisecond * 4)
// writeBuf writes the contents of buf through to the s.Writer.
// done only when arrival time makes sense.
drainBuf := func() {
if buf.Len() > 0 {
_, err := s.Writer.Write(buf.Bytes())
if err != nil {
return
}
buf.Reset()
}
}
// deliverOrWait is a helper func that processes
// an incoming packet. it waits until the arrival time,
// and then writes things out.
deliverOrWait := func(o *transportObject) {
buffered := len(o.msg) + buf.Len()
now := time.Now()
if now.Before(o.arrivalTime) {
if buffered < bufsize {
buf.Write(o.msg)
return
}
// we do not buffer + return here, instead hanging the
// call (i.e. not accepting any more transportObjects)
// so that we apply back-pressure to the sender.
// this sleep should wake up same time as ticker.
time.Sleep(o.arrivalTime.Sub(now))
}
// ok, we waited our due time. now rite the buf + msg.
// drainBuf first, before we write this message.
drainBuf()
// write this message.
_, err := s.Writer.Write(o.msg)
if err != nil {
log.Error("mock_stream", err)
}
}
for {
select {
case <-proc.Closing():
return // bail out of here.
case o, ok := <-s.toDeliver:
if !ok {
return
}
deliverOrWait(o)
case <-ticker.C: // ok, due to write it out.
drainBuf()
}
}
}
...@@ -3,9 +3,11 @@ package mocknet ...@@ -3,9 +3,11 @@ package mocknet
import ( import (
"bytes" "bytes"
"io" "io"
"math"
"math/rand" "math/rand"
"sync" "sync"
"testing" "testing"
"time"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
...@@ -478,3 +480,102 @@ func TestAdding(t *testing.T) { ...@@ -478,3 +480,102 @@ func TestAdding(t *testing.T) {
} }
} }
func TestRateLimiting(t *testing.T) {
rl := NewRatelimiter(10)
if !within(rl.Limit(10), time.Duration(float32(time.Second)), time.Millisecond/10) {
t.Fail()
}
if !within(rl.Limit(10), time.Duration(float32(time.Second*2)), time.Millisecond) {
t.Fail()
}
if !within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
t.Fail()
}
if within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
t.Fail()
}
rl.UpdateBandwidth(50)
if !within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
t.Fail()
}
if within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
t.Fail()
}
rl.UpdateBandwidth(100)
if !within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
t.Fail()
}
if within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
t.Fail()
}
}
func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool {
return math.Abs(float64(t1)-float64(t2)) < float64(tolerance)
}
func TestLimitedStreams(t *testing.T) {
mn, err := FullMeshConnected(context.Background(), 2)
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
messages := 4
messageSize := 500
handler := func(s inet.Stream) {
b := make([]byte, messageSize)
for i := 0; i < messages; i++ {
if _, err := io.ReadFull(s, b); err != nil {
log.Fatal(err)
}
if !bytes.Equal(b[:4], []byte("ping")) {
log.Fatal("bytes mismatch")
}
wg.Done()
}
s.Close()
}
hosts := mn.Hosts()
for _, h := range mn.Hosts() {
h.SetStreamHandler(protocol.TestingID, handler)
}
peers := mn.Peers()
links := mn.LinksBetweenPeers(peers[0], peers[1])
// 1000 byte per second bandwidth
bps := float64(1000)
opts := links[0].Options()
opts.Bandwidth = bps
for _, link := range links {
link.SetOptions(opts)
}
s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
if err != nil {
t.Fatal(err)
}
filler := make([]byte, messageSize-4)
data := append([]byte("ping"), filler...)
before := time.Now()
for i := 0; i < messages; i++ {
wg.Add(1)
if _, err := s.Write(data); err != nil {
panic(err)
}
}
wg.Wait()
if !within(time.Since(before), time.Duration(time.Second*2), time.Second/3) {
t.Fatal("Expected 2ish seconds but got ", time.Since(before))
}
}
package mocknet
import (
"time"
)
// A ratelimiter is used by a link to determine how long to wait before sending
// data given a bandwidth cap.
type ratelimiter struct {
bandwidth float64 // bytes per nanosecond
allowance float64 // in bytes
maxAllowance float64 // in bytes
lastUpdate time.Time // when allowance was updated last
count int // number of times rate limiting was applied
duration time.Duration // total delay introduced due to rate limiting
}
// Creates a new ratelimiter with bandwidth (in bytes/sec)
func NewRatelimiter(bandwidth float64) *ratelimiter {
// convert bandwidth to bytes per nanosecond
b := bandwidth / float64(time.Second)
return &ratelimiter{
bandwidth: b,
allowance: 0,
maxAllowance: bandwidth,
lastUpdate: time.Now(),
}
}
// Changes bandwidth of a ratelimiter and resets its allowance
func (r *ratelimiter) UpdateBandwidth(bandwidth float64) {
// Convert bandwidth from bytes/second to bytes/nanosecond
b := bandwidth / float64(time.Second)
r.bandwidth = b
// Reset allowance
r.allowance = 0
r.maxAllowance = bandwidth
r.lastUpdate = time.Now()
}
// Returns how long to wait before sending data with length 'dataSize' bytes
func (r *ratelimiter) Limit(dataSize int) time.Duration {
// update time
var duration time.Duration = time.Duration(0)
if r.bandwidth == 0 {
return duration
}
current := time.Now()
elapsedTime := current.Sub(r.lastUpdate)
r.lastUpdate = current
allowance := r.allowance + float64(elapsedTime)*r.bandwidth
// allowance can't exceed bandwidth
if allowance > r.maxAllowance {
allowance = r.maxAllowance
}
allowance -= float64(dataSize)
if allowance < 0 {
// sleep until allowance is back to 0
duration = time.Duration(-allowance / r.bandwidth)
// rate limiting was applied, record stats
r.count++
r.duration += duration
}
r.allowance = allowance
return duration
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment