From 1b9aa7789c228b7d493de53a55c49f9f117a92da Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 13 Sep 2017 15:13:27 -0700 Subject: [PATCH] update go-stream-muxer * Fix the tests to work with separate reset/close methods. * Ensure we interrupt writes on reset. * Always delay the proper time even if we're sending short messages. * Copy buffers as we send them. `Write` is not allowed to hang onto buffers. If we run into performance issues, we can always add a buffer pool. --- p2p/net/mock/mock_conn.go | 20 +++- p2p/net/mock/mock_link.go | 11 ++- p2p/net/mock/mock_stream.go | 176 +++++++++++++++++++++++++----------- package.json | 52 +++++------ 4 files changed, 176 insertions(+), 83 deletions(-) diff --git a/p2p/net/mock/mock_conn.go b/p2p/net/mock/mock_conn.go index ae90cdd..a826d66 100644 --- a/p2p/net/mock/mock_conn.go +++ b/p2p/net/mock/mock_conn.go @@ -2,7 +2,12 @@ package mocknet import ( "container/list" + "fmt" + "os" + "os/signal" + "runtime" "sync" + "syscall" process "github.com/jbenet/goprocess" ic "github.com/libp2p/go-libp2p-crypto" @@ -11,6 +16,19 @@ import ( ma "github.com/multiformats/go-multiaddr" ) +func init() { + go func() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGQUIT) + buf := make([]byte, 1<<20) + for { + <-sigs + stacklen := runtime.Stack(buf, true) + fmt.Printf("=== received SIGQUIT ===\n*** goroutine dump...\n%s\n*** end\n", buf[:stacklen]) + } + }() +} + // conn represents one side's perspective of a // live connection between two peers. // it goes over a particular link. @@ -54,7 +72,7 @@ func (c *conn) Close() error { func (c *conn) teardown() error { for _, s := range c.allStreams() { - s.Close() + s.Reset() } c.net.removeConn(c) c.net.notifyAll(func(n inet.Notifiee) { diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index ae74da8..d4d6053 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -2,7 +2,7 @@ package mocknet import ( // "fmt" - "net" + "io" "sync" "time" @@ -45,11 +45,12 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { } func (l *link) newStreamPair() (*stream, *stream) { - a, b := net.Pipe() + ra, wb := io.Pipe() + rb, wa := io.Pipe() - s1 := NewStream(a) - s2 := NewStream(b) - return s1, s2 + sa := NewStream(wa, ra) + sb := NewStream(wb, rb) + return sa, sb } func (l *link) Networks() []inet.Network { diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index 2529651..2f0a4ee 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -2,38 +2,52 @@ package mocknet import ( "bytes" + "errors" "io" "net" "time" - process "github.com/jbenet/goprocess" inet "github.com/libp2p/go-libp2p-net" protocol "github.com/libp2p/go-libp2p-protocol" ) // stream implements inet.Stream type stream struct { - Pipe net.Conn + write *io.PipeWriter + read *io.PipeReader conn *conn toDeliver chan *transportObject - proc process.Process + control chan int + state int + closed chan struct{} protocol protocol.ID } +var ErrReset error = errors.New("stream reset") +var ErrClosed error = errors.New("stream closed") + +const ( + stateOpen = iota + stateClose + stateReset +) + type transportObject struct { msg []byte arrivalTime time.Time } -func NewStream(p net.Conn) *stream { +func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream { s := &stream{ - Pipe: p, + read: r, + write: w, + control: make(chan int), + closed: make(chan struct{}), toDeliver: make(chan *transportObject), } - s.proc = process.WithTeardown(s.teardown) - s.proc.Go(s.transport) + go s.transport() return s } @@ -43,8 +57,13 @@ func (s *stream) Write(p []byte) (n int, err error) { delay := l.GetLatency() + l.RateLimit(len(p)) t := time.Now().Add(delay) select { - case <-s.proc.Closing(): // bail out if we're closing. - return 0, io.ErrClosedPipe + case <-s.closed: // bail out if we're closing. + switch s.state { + case stateReset: + return 0, ErrReset + case stateClose: + return 0, ErrClosed + } case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}: } return len(p), nil @@ -59,21 +78,46 @@ func (s *stream) SetProtocol(proto protocol.ID) { } func (s *stream) Close() error { - return s.proc.Close() + select { + case s.control <- stateClose: + case <-s.closed: + } + <-s.closed + if s.state == stateReset { + return nil + } else { + return ErrClosed + } } -// teardown shuts down the stream. it is called by s.proc.Close() -// after all the children of this s.proc (i.e. transport's proc) -// are done. -func (s *stream) teardown() error { - // at this point, no streams are writing. +func (s *stream) Reset() error { + // Cancel any pending reads. + s.write.Close() + select { + case s.control <- stateReset: + case <-s.closed: + } + <-s.closed + if s.state == stateReset { + return nil + } else { + return ErrClosed + } +} + +func (s *stream) teardown() { + s.write.Close() + + // at this point, no streams are writing. s.conn.removeStream(s) - s.Pipe.Close() + + // Mark as closed. + close(s.closed) + s.conn.net.notifyAll(func(n inet.Notifiee) { n.ClosedStream(s.conn.net, s) }) - return nil } func (s *stream) Conn() inet.Conn { @@ -81,33 +125,44 @@ func (s *stream) Conn() inet.Conn { } func (s *stream) SetDeadline(t time.Time) error { - return s.Pipe.SetDeadline(t) + return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} } -func (s *stream) SetReadDeadline(t time.Time) error { - return s.Pipe.SetReadDeadline(t) +func (p *stream) SetReadDeadline(t time.Time) error { + return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} } -func (s *stream) SetWriteDeadline(t time.Time) error { - return s.Pipe.SetWriteDeadline(t) +func (p *stream) SetWriteDeadline(t time.Time) error { + return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} } func (s *stream) Read(b []byte) (int, error) { - return s.Pipe.Read(b) + return s.read.Read(b) } // transport will grab message arrival times, wait until that time, and // then write the message out when it is scheduled to arrive -func (s *stream) transport(proc process.Process) { +func (s *stream) transport() { + defer s.teardown() + bufsize := 256 buf := new(bytes.Buffer) - ticker := time.NewTicker(time.Millisecond * 4) + timer := time.NewTimer(0) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + // cleanup + defer timer.Stop() // writeBuf writes the contents of buf through to the s.Writer. // done only when arrival time makes sense. drainBuf := func() { if buf.Len() > 0 { - _, err := s.Pipe.Write(buf.Bytes()) + _, err := s.write.Write(buf.Bytes()) if err != nil { return } @@ -121,44 +176,63 @@ func (s *stream) transport(proc process.Process) { deliverOrWait := func(o *transportObject) { buffered := len(o.msg) + buf.Len() - now := time.Now() - if now.Before(o.arrivalTime) { - if buffered < bufsize { - buf.Write(o.msg) - return + // Yes, we can end up extending a timer multiple times if we + // keep on making small writes but that shouldn't be too much of an + // issue. Fixing that would be painful. + if !timer.Stop() { + // FIXME: So, we *shouldn't* need to do this but we hang + // here if we don't... Go bug? + select { + case <-timer.C: + default: } - - // we do not buffer + return here, instead hanging the - // call (i.e. not accepting any more transportObjects) - // so that we apply back-pressure to the sender. - // this sleep should wake up same time as ticker. - time.Sleep(o.arrivalTime.Sub(now)) + } + delay := o.arrivalTime.Sub(time.Now()) + if delay >= 0 { + timer.Reset(delay) + } else { + timer.Reset(0) } - // ok, we waited our due time. now rite the buf + msg. - - // drainBuf first, before we write this message. - drainBuf() - - // write this message. - _, err := s.Pipe.Write(o.msg) - if err != nil { - log.Error("mock_stream", err) + if buffered >= bufsize { + select { + case <-timer.C: + case s.state = <-s.control: + return + } + drainBuf() + // write this message. + _, err := s.write.Write(o.msg) + if err != nil { + log.Error("mock_stream", err) + } + } else { + buf.Write(o.msg) } } for { - select { - case <-proc.Closing(): - return // bail out of here. + switch s.state { + case stateClose: + drainBuf() + return + case stateReset: + s.read.CloseWithError(ErrReset) + return + default: + panic("invalid state") + case stateOpen: + } + select { + case s.state = <-s.control: + continue case o, ok := <-s.toDeliver: if !ok { return } deliverOrWait(o) - - case <-ticker.C: // ok, due to write it out. + case <-timer.C: // ok, due to write it out. drainBuf() } } diff --git a/package.json b/package.json index df5a55c..d4c3769 100644 --- a/package.json +++ b/package.json @@ -109,9 +109,9 @@ "version": "0.0.0" }, { - "hash": "QmVNPgPmEG4QKaDKkxMPKY34Z53n8efzv1sEh4NTsdhto7", + "hash": "QmTMNkpso2WRMevXC8ZxgyBhJvoEHvk24SNeUr9Mf9UM1a", "name": "go-peerstream", - "version": "1.7.0" + "version": "2.0.2" }, { "author": "whyrusleeping", @@ -151,9 +151,9 @@ }, { "author": "whyrusleeping", - "hash": "QmdQcv14hCd41WEzNA4avJohJR5sdPqVgFtXZtDz6MTCKx", + "hash": "QmbrUTiVDSK3WGePN18qVjpGYmvXQt6YVPyyGoXWx593uq", "name": "go-tcp-transport", - "version": "1.2.2" + "version": "1.2.3" }, { "author": "whyrusleeping", @@ -181,21 +181,21 @@ }, { "author": "whyrusleeping", - "hash": "QmX49btJy5UQuHYmWxrNTmpcnUE5a4upGS6xPYH7mPE46D", + "hash": "QmTi4629yyHJ8qW9sXFjvxJpYcN499tHhERLZYdUqwRU9i", "name": "go-libp2p-conn", - "version": "1.6.12" + "version": "1.6.13" }, { "author": "whyrusleeping", - "hash": "QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF", + "hash": "QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1", "name": "go-libp2p-net", - "version": "1.6.12" + "version": "2.0.0" }, { "author": "whyrusleeping", - "hash": "QmVjRAPfRtResCMCE4eBqr4Beoa6A89P1YweG9wUS6RqUL", + "hash": "QmQbh3Rb7KM37As3vkHYnEFnzkVXNCP8EYGtHz6g2fXk14", "name": "go-libp2p-metrics", - "version": "1.6.10" + "version": "2.0.0" }, { "author": "whyrusleeping", @@ -205,15 +205,15 @@ }, { "author": "whyrusleeping", - "hash": "QmUwW8jMQDxXhLD2j4EfWqLEMX3MsvyWcWGvJPVDh1aTmu", + "hash": "QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6", "name": "go-libp2p-host", - "version": "1.3.19" + "version": "2.0.0" }, { "author": "whyrusleeping", - "hash": "QmQUmDr1DMDDy6KMSsJuyV9nVD7dJZ9iWxXESQWPvte2NP", + "hash": "QmW97nvnsknsoN8NUz8CUF5hVVaicLgNaEb6EZMk3oB943", "name": "go-libp2p-swarm", - "version": "1.7.7" + "version": "2.0.2" }, { "author": "whyrusleeping", @@ -223,15 +223,15 @@ }, { "author": "whyrusleeping", - "hash": "QmQ1bJEsmdEiGfTQRoj6CsshWmAKduAEDEbwzbvk5QT5Ui", + "hash": "QmP4cEjmvf8tC6ykxKXrvmYLo8vqtGsgduMatjbAKnBzv8", "name": "go-libp2p-netutil", - "version": "0.2.25" + "version": "0.3.1" }, { "author": "whyrusleeping", - "hash": "QmSmgF5Nnmf1Ygkv96xCmUdPk4QPx3JotTA7sqwXpoxCV2", + "hash": "QmPZRCaYeNLMo5GfcRS2rv9ZxVuXXt6MFg9dWLmgsdXKCw", "name": "go-libp2p-blankhost", - "version": "0.1.18" + "version": "0.2.0" }, { "author": "whyrusleeping", @@ -241,9 +241,9 @@ }, { "author": "whyrusleeping", - "hash": "Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U", + "hash": "QmfTJ3UpS5ycNX7uQvPUSSRjGxk9EhUG7SyCstX6tCoNXS", "name": "go-smux-yamux", - "version": "1.2.0" + "version": "2.0.0" }, { "author": "whyrusleeping", @@ -253,15 +253,15 @@ }, { "author": "whyrusleeping", - "hash": "QmRVYfZ7tWNHPBzWiG6KWGzvT2hcGems8srihsQE29x1U5", + "hash": "QmVniQJkdzLZaZwzwMdd3dJTvWiJ1DQEkreVy6hs6h7Vk5", "name": "go-smux-multistream", - "version": "1.5.5" + "version": "2.0.0" }, { "author": "whyrusleeping", - "hash": "QmTuwwqGf4NH2Jj3opKtaKx45ge4RiXSCtvUkb7a4gk2ua", + "hash": "QmYUpfXEBqLdtiSUDzzc8hLfcELPHiPtANF12EpEX1WCVB", "name": "go-libp2p-connmgr", - "version": "0.1.3" + "version": "0.2.0" }, { "author": "whyrusleeping", @@ -271,9 +271,9 @@ }, { "author": "vyzo", - "hash": "QmYkTCcfrPdR5QMasnhh3FVRVNEKzH3YsvuBPpB4YPgwWC", + "hash": "QmVXc7cgEkxWDELn9sGV9r1HbqfQR9YCUmbsrkp1rcXSjn", "name": "go-libp2p-circuit", - "version": "1.1.8" + "version": "2.0.1" }, { "author": "lgierth", -- GitLab