Commit 1b9aa778 authored by Steven Allen's avatar Steven Allen
Browse files

update go-stream-muxer

* Fix the tests to work with separate reset/close methods.
* Ensure we interrupt writes on reset.
* Always delay the proper time even if we're sending short messages.
* Copy buffers as we send them. `Write` is not allowed to hang onto buffers. If
  we run into performance issues, we can always add a buffer pool.
parent 958008b5
......@@ -2,7 +2,12 @@ package mocknet
import (
"container/list"
"fmt"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
process "github.com/jbenet/goprocess"
ic "github.com/libp2p/go-libp2p-crypto"
......@@ -11,6 +16,19 @@ import (
ma "github.com/multiformats/go-multiaddr"
)
func init() {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGQUIT)
buf := make([]byte, 1<<20)
for {
<-sigs
stacklen := runtime.Stack(buf, true)
fmt.Printf("=== received SIGQUIT ===\n*** goroutine dump...\n%s\n*** end\n", buf[:stacklen])
}
}()
}
// conn represents one side's perspective of a
// live connection between two peers.
// it goes over a particular link.
......@@ -54,7 +72,7 @@ func (c *conn) Close() error {
func (c *conn) teardown() error {
for _, s := range c.allStreams() {
s.Close()
s.Reset()
}
c.net.removeConn(c)
c.net.notifyAll(func(n inet.Notifiee) {
......
......@@ -2,7 +2,7 @@ package mocknet
import (
// "fmt"
"net"
"io"
"sync"
"time"
......@@ -45,11 +45,12 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
}
func (l *link) newStreamPair() (*stream, *stream) {
a, b := net.Pipe()
ra, wb := io.Pipe()
rb, wa := io.Pipe()
s1 := NewStream(a)
s2 := NewStream(b)
return s1, s2
sa := NewStream(wa, ra)
sb := NewStream(wb, rb)
return sa, sb
}
func (l *link) Networks() []inet.Network {
......
......@@ -2,38 +2,52 @@ package mocknet
import (
"bytes"
"errors"
"io"
"net"
"time"
process "github.com/jbenet/goprocess"
inet "github.com/libp2p/go-libp2p-net"
protocol "github.com/libp2p/go-libp2p-protocol"
)
// stream implements inet.Stream
type stream struct {
Pipe net.Conn
write *io.PipeWriter
read *io.PipeReader
conn *conn
toDeliver chan *transportObject
proc process.Process
control chan int
state int
closed chan struct{}
protocol protocol.ID
}
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")
const (
stateOpen = iota
stateClose
stateReset
)
type transportObject struct {
msg []byte
arrivalTime time.Time
}
func NewStream(p net.Conn) *stream {
func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream {
s := &stream{
Pipe: p,
read: r,
write: w,
control: make(chan int),
closed: make(chan struct{}),
toDeliver: make(chan *transportObject),
}
s.proc = process.WithTeardown(s.teardown)
s.proc.Go(s.transport)
go s.transport()
return s
}
......@@ -43,8 +57,13 @@ func (s *stream) Write(p []byte) (n int, err error) {
delay := l.GetLatency() + l.RateLimit(len(p))
t := time.Now().Add(delay)
select {
case <-s.proc.Closing(): // bail out if we're closing.
return 0, io.ErrClosedPipe
case <-s.closed: // bail out if we're closing.
switch s.state {
case stateReset:
return 0, ErrReset
case stateClose:
return 0, ErrClosed
}
case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
}
return len(p), nil
......@@ -59,21 +78,46 @@ func (s *stream) SetProtocol(proto protocol.ID) {
}
func (s *stream) Close() error {
return s.proc.Close()
select {
case s.control <- stateClose:
case <-s.closed:
}
<-s.closed
if s.state == stateReset {
return nil
} else {
return ErrClosed
}
}
// teardown shuts down the stream. it is called by s.proc.Close()
// after all the children of this s.proc (i.e. transport's proc)
// are done.
func (s *stream) teardown() error {
// at this point, no streams are writing.
func (s *stream) Reset() error {
// Cancel any pending reads.
s.write.Close()
select {
case s.control <- stateReset:
case <-s.closed:
}
<-s.closed
if s.state == stateReset {
return nil
} else {
return ErrClosed
}
}
func (s *stream) teardown() {
s.write.Close()
// at this point, no streams are writing.
s.conn.removeStream(s)
s.Pipe.Close()
// Mark as closed.
close(s.closed)
s.conn.net.notifyAll(func(n inet.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
return nil
}
func (s *stream) Conn() inet.Conn {
......@@ -81,33 +125,44 @@ func (s *stream) Conn() inet.Conn {
}
func (s *stream) SetDeadline(t time.Time) error {
return s.Pipe.SetDeadline(t)
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) SetReadDeadline(t time.Time) error {
return s.Pipe.SetReadDeadline(t)
func (p *stream) SetReadDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return s.Pipe.SetWriteDeadline(t)
func (p *stream) SetWriteDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) Read(b []byte) (int, error) {
return s.Pipe.Read(b)
return s.read.Read(b)
}
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
func (s *stream) transport(proc process.Process) {
func (s *stream) transport() {
defer s.teardown()
bufsize := 256
buf := new(bytes.Buffer)
ticker := time.NewTicker(time.Millisecond * 4)
timer := time.NewTimer(0)
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
// cleanup
defer timer.Stop()
// writeBuf writes the contents of buf through to the s.Writer.
// done only when arrival time makes sense.
drainBuf := func() {
if buf.Len() > 0 {
_, err := s.Pipe.Write(buf.Bytes())
_, err := s.write.Write(buf.Bytes())
if err != nil {
return
}
......@@ -121,44 +176,63 @@ func (s *stream) transport(proc process.Process) {
deliverOrWait := func(o *transportObject) {
buffered := len(o.msg) + buf.Len()
now := time.Now()
if now.Before(o.arrivalTime) {
if buffered < bufsize {
buf.Write(o.msg)
return
// Yes, we can end up extending a timer multiple times if we
// keep on making small writes but that shouldn't be too much of an
// issue. Fixing that would be painful.
if !timer.Stop() {
// FIXME: So, we *shouldn't* need to do this but we hang
// here if we don't... Go bug?
select {
case <-timer.C:
default:
}
// we do not buffer + return here, instead hanging the
// call (i.e. not accepting any more transportObjects)
// so that we apply back-pressure to the sender.
// this sleep should wake up same time as ticker.
time.Sleep(o.arrivalTime.Sub(now))
}
delay := o.arrivalTime.Sub(time.Now())
if delay >= 0 {
timer.Reset(delay)
} else {
timer.Reset(0)
}
// ok, we waited our due time. now rite the buf + msg.
// drainBuf first, before we write this message.
drainBuf()
// write this message.
_, err := s.Pipe.Write(o.msg)
if err != nil {
log.Error("mock_stream", err)
if buffered >= bufsize {
select {
case <-timer.C:
case s.state = <-s.control:
return
}
drainBuf()
// write this message.
_, err := s.write.Write(o.msg)
if err != nil {
log.Error("mock_stream", err)
}
} else {
buf.Write(o.msg)
}
}
for {
select {
case <-proc.Closing():
return // bail out of here.
switch s.state {
case stateClose:
drainBuf()
return
case stateReset:
s.read.CloseWithError(ErrReset)
return
default:
panic("invalid state")
case stateOpen:
}
select {
case s.state = <-s.control:
continue
case o, ok := <-s.toDeliver:
if !ok {
return
}
deliverOrWait(o)
case <-ticker.C: // ok, due to write it out.
case <-timer.C: // ok, due to write it out.
drainBuf()
}
}
......
......@@ -109,9 +109,9 @@
"version": "0.0.0"
},
{
"hash": "QmVNPgPmEG4QKaDKkxMPKY34Z53n8efzv1sEh4NTsdhto7",
"hash": "QmTMNkpso2WRMevXC8ZxgyBhJvoEHvk24SNeUr9Mf9UM1a",
"name": "go-peerstream",
"version": "1.7.0"
"version": "2.0.2"
},
{
"author": "whyrusleeping",
......@@ -151,9 +151,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmdQcv14hCd41WEzNA4avJohJR5sdPqVgFtXZtDz6MTCKx",
"hash": "QmbrUTiVDSK3WGePN18qVjpGYmvXQt6YVPyyGoXWx593uq",
"name": "go-tcp-transport",
"version": "1.2.2"
"version": "1.2.3"
},
{
"author": "whyrusleeping",
......@@ -181,21 +181,21 @@
},
{
"author": "whyrusleeping",
"hash": "QmX49btJy5UQuHYmWxrNTmpcnUE5a4upGS6xPYH7mPE46D",
"hash": "QmTi4629yyHJ8qW9sXFjvxJpYcN499tHhERLZYdUqwRU9i",
"name": "go-libp2p-conn",
"version": "1.6.12"
"version": "1.6.13"
},
{
"author": "whyrusleeping",
"hash": "QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF",
"hash": "QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1",
"name": "go-libp2p-net",
"version": "1.6.12"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmVjRAPfRtResCMCE4eBqr4Beoa6A89P1YweG9wUS6RqUL",
"hash": "QmQbh3Rb7KM37As3vkHYnEFnzkVXNCP8EYGtHz6g2fXk14",
"name": "go-libp2p-metrics",
"version": "1.6.10"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
......@@ -205,15 +205,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmUwW8jMQDxXhLD2j4EfWqLEMX3MsvyWcWGvJPVDh1aTmu",
"hash": "QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6",
"name": "go-libp2p-host",
"version": "1.3.19"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmQUmDr1DMDDy6KMSsJuyV9nVD7dJZ9iWxXESQWPvte2NP",
"hash": "QmW97nvnsknsoN8NUz8CUF5hVVaicLgNaEb6EZMk3oB943",
"name": "go-libp2p-swarm",
"version": "1.7.7"
"version": "2.0.2"
},
{
"author": "whyrusleeping",
......@@ -223,15 +223,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmQ1bJEsmdEiGfTQRoj6CsshWmAKduAEDEbwzbvk5QT5Ui",
"hash": "QmP4cEjmvf8tC6ykxKXrvmYLo8vqtGsgduMatjbAKnBzv8",
"name": "go-libp2p-netutil",
"version": "0.2.25"
"version": "0.3.1"
},
{
"author": "whyrusleeping",
"hash": "QmSmgF5Nnmf1Ygkv96xCmUdPk4QPx3JotTA7sqwXpoxCV2",
"hash": "QmPZRCaYeNLMo5GfcRS2rv9ZxVuXXt6MFg9dWLmgsdXKCw",
"name": "go-libp2p-blankhost",
"version": "0.1.18"
"version": "0.2.0"
},
{
"author": "whyrusleeping",
......@@ -241,9 +241,9 @@
},
{
"author": "whyrusleeping",
"hash": "Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U",
"hash": "QmfTJ3UpS5ycNX7uQvPUSSRjGxk9EhUG7SyCstX6tCoNXS",
"name": "go-smux-yamux",
"version": "1.2.0"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
......@@ -253,15 +253,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmRVYfZ7tWNHPBzWiG6KWGzvT2hcGems8srihsQE29x1U5",
"hash": "QmVniQJkdzLZaZwzwMdd3dJTvWiJ1DQEkreVy6hs6h7Vk5",
"name": "go-smux-multistream",
"version": "1.5.5"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmTuwwqGf4NH2Jj3opKtaKx45ge4RiXSCtvUkb7a4gk2ua",
"hash": "QmYUpfXEBqLdtiSUDzzc8hLfcELPHiPtANF12EpEX1WCVB",
"name": "go-libp2p-connmgr",
"version": "0.1.3"
"version": "0.2.0"
},
{
"author": "whyrusleeping",
......@@ -271,9 +271,9 @@
},
{
"author": "vyzo",
"hash": "QmYkTCcfrPdR5QMasnhh3FVRVNEKzH3YsvuBPpB4YPgwWC",
"hash": "QmVXc7cgEkxWDELn9sGV9r1HbqfQR9YCUmbsrkp1rcXSjn",
"name": "go-libp2p-circuit",
"version": "1.1.8"
"version": "2.0.1"
},
{
"author": "lgierth",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment