diff --git a/.gx/lastpubver b/.gx/lastpubver index 6ac6417fa841a2872394bfebc88b231ce874584c..11784abd8678a4863e817d95d5ef8511fc9a656a 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -4.5.5: QmXZyBQMkqSYigxhJResC6fLWDGFhbphK67eZoqMDUvBmK +5.0.0: QmTykKqiBX2QyDjrJDKX6qzwU2ybMtWqaLET23mgyDSojx diff --git a/examples/echo/main.go b/examples/echo/main.go index 7297e6b10c99c52dc817ad2c24bb9c7947a3913f..cc37453da3f8416e73d525a1e3a49788385c5542 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -134,8 +134,12 @@ func main() { // a user-defined protocol name. ha.SetStreamHandler("/echo/1.0.0", func(s net.Stream) { log.Println("Got a new stream!") - defer s.Close() - doEcho(s) + if err := doEcho(s); err != nil { + log.Println(err) + s.Reset() + } else { + s.Close() + } }) if *target == "" { @@ -194,18 +198,14 @@ func main() { } // doEcho reads a line of data a stream and writes it back -func doEcho(s net.Stream) { +func doEcho(s net.Stream) error { buf := bufio.NewReader(s) str, err := buf.ReadString('\n') if err != nil { - log.Println(err) - return + return err } log.Printf("read: %s\n", str) _, err = s.Write([]byte(str)) - if err != nil { - log.Println(err) - return - } + return err } diff --git a/examples/http-proxy/proxy.go b/examples/http-proxy/proxy.go index 235894865a4771990431f762b4b5e2f0f9e243a6..79999bbbde2f95a8355d50ed2bd3f6459a8d886d 100644 --- a/examples/http-proxy/proxy.go +++ b/examples/http-proxy/proxy.go @@ -109,6 +109,7 @@ func streamHandler(stream inet.Stream) { // Read the HTTP request from the buffer req, err := http.ReadRequest(buf) if err != nil { + stream.Reset() log.Println(err) return } @@ -132,6 +133,7 @@ func streamHandler(stream inet.Stream) { fmt.Printf("Making request to %s\n", req.URL) resp, err := http.DefaultTransport.RoundTrip(outreq) if err != nil { + stream.Reset() log.Println(err) return } @@ -176,6 +178,7 @@ func (p *ProxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) { // r.Write() writes the HTTP request to the stream. err = r.Write(stream) if err != nil { + stream.Reset() log.Println(err) http.Error(w, err.Error(), http.StatusServiceUnavailable) return @@ -186,6 +189,7 @@ func (p *ProxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) { buf := bufio.NewReader(stream) resp, err := http.ReadResponse(buf, r) if err != nil { + stream.Reset() log.Println(err) http.Error(w, err.Error(), http.StatusServiceUnavailable) return diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 3f0f11356e365c6f5cfccd564a21c33f8ae677fa..3a7874b3e7a2af28b4d0bfbc04b7e80b0823e30e 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -240,7 +240,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { if h.negtimeout > 0 { if err := s.SetDeadline(time.Now().Add(h.negtimeout)); err != nil { log.Error("setting stream deadline: ", err) - s.Close() + s.Reset() return } } @@ -257,7 +257,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { } else { log.Warning("protocol mux failed: %s (took %s)", err, took) } - s.Close() + s.Reset() return } @@ -269,7 +269,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { if h.negtimeout > 0 { if err := s.SetDeadline(time.Time{}); err != nil { log.Error("resetting stream deadline: ", err) - s.Close() + s.Reset() return } } @@ -364,7 +364,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I selected, err := msmux.SelectOneOf(protoStrs, s) if err != nil { - s.Close() + s.Reset() return nil, err } selpid := protocol.ID(selected) diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index af4a8306b7a0f93e0d855308992ff44fd78e959c..4c14934507ef557f8d8a982acf62ff12077973cf 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -180,7 +180,7 @@ func TestHostProtoMismatch(t *testing.T) { h1.SetStreamHandler("/super", func(s inet.Stream) { t.Error("shouldnt get here") - s.Close() + s.Reset() }) _, err := h2.NewStream(ctx, h1.ID(), "/foo", "/bar", "/baz/1.0.0") diff --git a/p2p/net/mock/mock_conn.go b/p2p/net/mock/mock_conn.go index ae90cdd5e476aa630816f586c0ae46c6515947e5..fcb634624c4716a16ab771f7770983816b77c0bd 100644 --- a/p2p/net/mock/mock_conn.go +++ b/p2p/net/mock/mock_conn.go @@ -54,7 +54,7 @@ func (c *conn) Close() error { func (c *conn) teardown() error { for _, s := range c.allStreams() { - s.Close() + s.Reset() } c.net.removeConn(c) c.net.notifyAll(func(n inet.Notifiee) { diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index ae74da85c75d647a231c8234143e8540c12a6901..d4d605310d78b29f820f45744540416930894262 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -2,7 +2,7 @@ package mocknet import ( // "fmt" - "net" + "io" "sync" "time" @@ -45,11 +45,12 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { } func (l *link) newStreamPair() (*stream, *stream) { - a, b := net.Pipe() + ra, wb := io.Pipe() + rb, wa := io.Pipe() - s1 := NewStream(a) - s2 := NewStream(b) - return s1, s2 + sa := NewStream(wa, ra) + sb := NewStream(wb, rb) + return sa, sb } func (l *link) Networks() []inet.Network { diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index 25296516b94f1f53f8e52d98000c3809b5beaa75..2f0a4ee7122da27b0dcffaeffb762a1b1ae9eaf5 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -2,38 +2,52 @@ package mocknet import ( "bytes" + "errors" "io" "net" "time" - process "github.com/jbenet/goprocess" inet "github.com/libp2p/go-libp2p-net" protocol "github.com/libp2p/go-libp2p-protocol" ) // stream implements inet.Stream type stream struct { - Pipe net.Conn + write *io.PipeWriter + read *io.PipeReader conn *conn toDeliver chan *transportObject - proc process.Process + control chan int + state int + closed chan struct{} protocol protocol.ID } +var ErrReset error = errors.New("stream reset") +var ErrClosed error = errors.New("stream closed") + +const ( + stateOpen = iota + stateClose + stateReset +) + type transportObject struct { msg []byte arrivalTime time.Time } -func NewStream(p net.Conn) *stream { +func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream { s := &stream{ - Pipe: p, + read: r, + write: w, + control: make(chan int), + closed: make(chan struct{}), toDeliver: make(chan *transportObject), } - s.proc = process.WithTeardown(s.teardown) - s.proc.Go(s.transport) + go s.transport() return s } @@ -43,8 +57,13 @@ func (s *stream) Write(p []byte) (n int, err error) { delay := l.GetLatency() + l.RateLimit(len(p)) t := time.Now().Add(delay) select { - case <-s.proc.Closing(): // bail out if we're closing. - return 0, io.ErrClosedPipe + case <-s.closed: // bail out if we're closing. + switch s.state { + case stateReset: + return 0, ErrReset + case stateClose: + return 0, ErrClosed + } case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}: } return len(p), nil @@ -59,21 +78,46 @@ func (s *stream) SetProtocol(proto protocol.ID) { } func (s *stream) Close() error { - return s.proc.Close() + select { + case s.control <- stateClose: + case <-s.closed: + } + <-s.closed + if s.state == stateReset { + return nil + } else { + return ErrClosed + } } -// teardown shuts down the stream. it is called by s.proc.Close() -// after all the children of this s.proc (i.e. transport's proc) -// are done. -func (s *stream) teardown() error { - // at this point, no streams are writing. +func (s *stream) Reset() error { + // Cancel any pending reads. + s.write.Close() + select { + case s.control <- stateReset: + case <-s.closed: + } + <-s.closed + if s.state == stateReset { + return nil + } else { + return ErrClosed + } +} + +func (s *stream) teardown() { + s.write.Close() + + // at this point, no streams are writing. s.conn.removeStream(s) - s.Pipe.Close() + + // Mark as closed. + close(s.closed) + s.conn.net.notifyAll(func(n inet.Notifiee) { n.ClosedStream(s.conn.net, s) }) - return nil } func (s *stream) Conn() inet.Conn { @@ -81,33 +125,44 @@ func (s *stream) Conn() inet.Conn { } func (s *stream) SetDeadline(t time.Time) error { - return s.Pipe.SetDeadline(t) + return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} } -func (s *stream) SetReadDeadline(t time.Time) error { - return s.Pipe.SetReadDeadline(t) +func (p *stream) SetReadDeadline(t time.Time) error { + return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} } -func (s *stream) SetWriteDeadline(t time.Time) error { - return s.Pipe.SetWriteDeadline(t) +func (p *stream) SetWriteDeadline(t time.Time) error { + return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} } func (s *stream) Read(b []byte) (int, error) { - return s.Pipe.Read(b) + return s.read.Read(b) } // transport will grab message arrival times, wait until that time, and // then write the message out when it is scheduled to arrive -func (s *stream) transport(proc process.Process) { +func (s *stream) transport() { + defer s.teardown() + bufsize := 256 buf := new(bytes.Buffer) - ticker := time.NewTicker(time.Millisecond * 4) + timer := time.NewTimer(0) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + + // cleanup + defer timer.Stop() // writeBuf writes the contents of buf through to the s.Writer. // done only when arrival time makes sense. drainBuf := func() { if buf.Len() > 0 { - _, err := s.Pipe.Write(buf.Bytes()) + _, err := s.write.Write(buf.Bytes()) if err != nil { return } @@ -121,44 +176,63 @@ func (s *stream) transport(proc process.Process) { deliverOrWait := func(o *transportObject) { buffered := len(o.msg) + buf.Len() - now := time.Now() - if now.Before(o.arrivalTime) { - if buffered < bufsize { - buf.Write(o.msg) - return + // Yes, we can end up extending a timer multiple times if we + // keep on making small writes but that shouldn't be too much of an + // issue. Fixing that would be painful. + if !timer.Stop() { + // FIXME: So, we *shouldn't* need to do this but we hang + // here if we don't... Go bug? + select { + case <-timer.C: + default: } - - // we do not buffer + return here, instead hanging the - // call (i.e. not accepting any more transportObjects) - // so that we apply back-pressure to the sender. - // this sleep should wake up same time as ticker. - time.Sleep(o.arrivalTime.Sub(now)) + } + delay := o.arrivalTime.Sub(time.Now()) + if delay >= 0 { + timer.Reset(delay) + } else { + timer.Reset(0) } - // ok, we waited our due time. now rite the buf + msg. - - // drainBuf first, before we write this message. - drainBuf() - - // write this message. - _, err := s.Pipe.Write(o.msg) - if err != nil { - log.Error("mock_stream", err) + if buffered >= bufsize { + select { + case <-timer.C: + case s.state = <-s.control: + return + } + drainBuf() + // write this message. + _, err := s.write.Write(o.msg) + if err != nil { + log.Error("mock_stream", err) + } + } else { + buf.Write(o.msg) } } for { - select { - case <-proc.Closing(): - return // bail out of here. + switch s.state { + case stateClose: + drainBuf() + return + case stateReset: + s.read.CloseWithError(ErrReset) + return + default: + panic("invalid state") + case stateOpen: + } + select { + case s.state = <-s.control: + continue case o, ok := <-s.toDeliver: if !ok { return } deliverOrWait(o) - - case <-ticker.C: // ok, due to write it out. + case <-timer.C: // ok, due to write it out. drainBuf() } } diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index d6e571e0f31f9e28382df0b5116b8e5ad1bc3fa8..54464f3080b125da15cdce82e0682aa203b8dab3 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -33,33 +33,42 @@ func NewPingService(h host.Host) *PingService { } func (p *PingService) PingHandler(s inet.Stream) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - buf := make([]byte, PingSize) + errCh := make(chan error, 1) + defer close(errCh) timer := time.NewTimer(pingTimeout) defer timer.Stop() go func() { select { case <-timer.C: - case <-ctx.Done(): + log.Debug("ping timeout") + s.Reset() + case err, ok := <-errCh: + if ok { + log.Debug(err) + if err == io.EOF { + s.Close() + } else { + s.Reset() + } + } else { + log.Error("ping loop failed without error") + } } - - s.Close() }() for { _, err := io.ReadFull(s, buf) if err != nil { - log.Debug(err) + errCh <- err return } _, err = s.Write(buf) if err != nil { - log.Debug(err) + errCh <- err return } @@ -84,6 +93,7 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio default: t, err := ping(s) if err != nil { + s.Reset() log.Debugf("ping error: %s", err) return } diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go index a79ac71e7e5babbb677898284b615cb0103b2772..55b7b1ce1e9b35f5a5a328aa28452b37926ba889 100644 --- a/p2p/test/reconnects/reconnect_test.go +++ b/p2p/test/reconnects/reconnect_test.go @@ -31,8 +31,12 @@ func EchoStreamHandler(stream inet.Stream) { c := stream.Conn() log.Debugf("%s echoing %s", c.LocalPeer(), c.RemotePeer()) go func() { - defer stream.Close() - io.Copy(stream, stream) + _, err := io.Copy(stream, stream) + if err == nil { + stream.Close() + } else { + stream.Reset() + } }() } diff --git a/package.json b/package.json index df5a55ca563cf0d9efb35ed9c78b12afd9cfb2c2..aa94f15fcf39b70a764f4ba82673134dea29beaa 100644 --- a/package.json +++ b/package.json @@ -109,9 +109,9 @@ "version": "0.0.0" }, { - "hash": "QmVNPgPmEG4QKaDKkxMPKY34Z53n8efzv1sEh4NTsdhto7", + "hash": "QmTMNkpso2WRMevXC8ZxgyBhJvoEHvk24SNeUr9Mf9UM1a", "name": "go-peerstream", - "version": "1.7.0" + "version": "2.0.2" }, { "author": "whyrusleeping", @@ -151,9 +151,9 @@ }, { "author": "whyrusleeping", - "hash": "QmdQcv14hCd41WEzNA4avJohJR5sdPqVgFtXZtDz6MTCKx", + "hash": "QmbrUTiVDSK3WGePN18qVjpGYmvXQt6YVPyyGoXWx593uq", "name": "go-tcp-transport", - "version": "1.2.2" + "version": "1.2.3" }, { "author": "whyrusleeping", @@ -181,21 +181,21 @@ }, { "author": "whyrusleeping", - "hash": "QmX49btJy5UQuHYmWxrNTmpcnUE5a4upGS6xPYH7mPE46D", + "hash": "QmTi4629yyHJ8qW9sXFjvxJpYcN499tHhERLZYdUqwRU9i", "name": "go-libp2p-conn", - "version": "1.6.12" + "version": "1.6.13" }, { "author": "whyrusleeping", - "hash": "QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF", + "hash": "QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1", "name": "go-libp2p-net", - "version": "1.6.12" + "version": "2.0.0" }, { "author": "whyrusleeping", - "hash": "QmVjRAPfRtResCMCE4eBqr4Beoa6A89P1YweG9wUS6RqUL", + "hash": "QmQbh3Rb7KM37As3vkHYnEFnzkVXNCP8EYGtHz6g2fXk14", "name": "go-libp2p-metrics", - "version": "1.6.10" + "version": "2.0.0" }, { "author": "whyrusleeping", @@ -205,15 +205,15 @@ }, { "author": "whyrusleeping", - "hash": "QmUwW8jMQDxXhLD2j4EfWqLEMX3MsvyWcWGvJPVDh1aTmu", + "hash": "QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6", "name": "go-libp2p-host", - "version": "1.3.19" + "version": "2.0.0" }, { "author": "whyrusleeping", - "hash": "QmQUmDr1DMDDy6KMSsJuyV9nVD7dJZ9iWxXESQWPvte2NP", + "hash": "QmW97nvnsknsoN8NUz8CUF5hVVaicLgNaEb6EZMk3oB943", "name": "go-libp2p-swarm", - "version": "1.7.7" + "version": "2.0.2" }, { "author": "whyrusleeping", @@ -223,15 +223,15 @@ }, { "author": "whyrusleeping", - "hash": "QmQ1bJEsmdEiGfTQRoj6CsshWmAKduAEDEbwzbvk5QT5Ui", + "hash": "QmP4cEjmvf8tC6ykxKXrvmYLo8vqtGsgduMatjbAKnBzv8", "name": "go-libp2p-netutil", - "version": "0.2.25" + "version": "0.3.1" }, { "author": "whyrusleeping", - "hash": "QmSmgF5Nnmf1Ygkv96xCmUdPk4QPx3JotTA7sqwXpoxCV2", + "hash": "QmPZRCaYeNLMo5GfcRS2rv9ZxVuXXt6MFg9dWLmgsdXKCw", "name": "go-libp2p-blankhost", - "version": "0.1.18" + "version": "0.2.0" }, { "author": "whyrusleeping", @@ -241,9 +241,9 @@ }, { "author": "whyrusleeping", - "hash": "Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U", + "hash": "QmfTJ3UpS5ycNX7uQvPUSSRjGxk9EhUG7SyCstX6tCoNXS", "name": "go-smux-yamux", - "version": "1.2.0" + "version": "2.0.0" }, { "author": "whyrusleeping", @@ -253,15 +253,15 @@ }, { "author": "whyrusleeping", - "hash": "QmRVYfZ7tWNHPBzWiG6KWGzvT2hcGems8srihsQE29x1U5", + "hash": "QmVniQJkdzLZaZwzwMdd3dJTvWiJ1DQEkreVy6hs6h7Vk5", "name": "go-smux-multistream", - "version": "1.5.5" + "version": "2.0.0" }, { "author": "whyrusleeping", - "hash": "QmTuwwqGf4NH2Jj3opKtaKx45ge4RiXSCtvUkb7a4gk2ua", + "hash": "QmYUpfXEBqLdtiSUDzzc8hLfcELPHiPtANF12EpEX1WCVB", "name": "go-libp2p-connmgr", - "version": "0.1.3" + "version": "0.2.0" }, { "author": "whyrusleeping", @@ -271,9 +271,9 @@ }, { "author": "vyzo", - "hash": "QmYkTCcfrPdR5QMasnhh3FVRVNEKzH3YsvuBPpB4YPgwWC", + "hash": "QmVXc7cgEkxWDELn9sGV9r1HbqfQR9YCUmbsrkp1rcXSjn", "name": "go-libp2p-circuit", - "version": "1.1.8" + "version": "2.0.1" }, { "author": "lgierth", @@ -287,6 +287,6 @@ "license": "MIT", "name": "go-libp2p", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "4.5.5" + "version": "5.0.0" }