Commit 461faf4a authored by Steven Allen's avatar Steven Allen Committed by GitHub
Browse files

Merge pull request #230 from libp2p/feat/update-stream-muxer

update go-stream-muxer
parents 958008b5 664afd4c
4.5.5: QmXZyBQMkqSYigxhJResC6fLWDGFhbphK67eZoqMDUvBmK
5.0.0: QmTykKqiBX2QyDjrJDKX6qzwU2ybMtWqaLET23mgyDSojx
......@@ -134,8 +134,12 @@ func main() {
// a user-defined protocol name.
ha.SetStreamHandler("/echo/1.0.0", func(s net.Stream) {
log.Println("Got a new stream!")
defer s.Close()
doEcho(s)
if err := doEcho(s); err != nil {
log.Println(err)
s.Reset()
} else {
s.Close()
}
})
if *target == "" {
......@@ -194,18 +198,14 @@ func main() {
}
// doEcho reads a line of data a stream and writes it back
func doEcho(s net.Stream) {
func doEcho(s net.Stream) error {
buf := bufio.NewReader(s)
str, err := buf.ReadString('\n')
if err != nil {
log.Println(err)
return
return err
}
log.Printf("read: %s\n", str)
_, err = s.Write([]byte(str))
if err != nil {
log.Println(err)
return
}
return err
}
......@@ -109,6 +109,7 @@ func streamHandler(stream inet.Stream) {
// Read the HTTP request from the buffer
req, err := http.ReadRequest(buf)
if err != nil {
stream.Reset()
log.Println(err)
return
}
......@@ -132,6 +133,7 @@ func streamHandler(stream inet.Stream) {
fmt.Printf("Making request to %s\n", req.URL)
resp, err := http.DefaultTransport.RoundTrip(outreq)
if err != nil {
stream.Reset()
log.Println(err)
return
}
......@@ -176,6 +178,7 @@ func (p *ProxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// r.Write() writes the HTTP request to the stream.
err = r.Write(stream)
if err != nil {
stream.Reset()
log.Println(err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
......@@ -186,6 +189,7 @@ func (p *ProxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
buf := bufio.NewReader(stream)
resp, err := http.ReadResponse(buf, r)
if err != nil {
stream.Reset()
log.Println(err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
......
......@@ -240,7 +240,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
if h.negtimeout > 0 {
if err := s.SetDeadline(time.Now().Add(h.negtimeout)); err != nil {
log.Error("setting stream deadline: ", err)
s.Close()
s.Reset()
return
}
}
......@@ -257,7 +257,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
} else {
log.Warning("protocol mux failed: %s (took %s)", err, took)
}
s.Close()
s.Reset()
return
}
......@@ -269,7 +269,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
if h.negtimeout > 0 {
if err := s.SetDeadline(time.Time{}); err != nil {
log.Error("resetting stream deadline: ", err)
s.Close()
s.Reset()
return
}
}
......@@ -364,7 +364,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
selected, err := msmux.SelectOneOf(protoStrs, s)
if err != nil {
s.Close()
s.Reset()
return nil, err
}
selpid := protocol.ID(selected)
......
......@@ -180,7 +180,7 @@ func TestHostProtoMismatch(t *testing.T) {
h1.SetStreamHandler("/super", func(s inet.Stream) {
t.Error("shouldnt get here")
s.Close()
s.Reset()
})
_, err := h2.NewStream(ctx, h1.ID(), "/foo", "/bar", "/baz/1.0.0")
......
......@@ -54,7 +54,7 @@ func (c *conn) Close() error {
func (c *conn) teardown() error {
for _, s := range c.allStreams() {
s.Close()
s.Reset()
}
c.net.removeConn(c)
c.net.notifyAll(func(n inet.Notifiee) {
......
......@@ -2,7 +2,7 @@ package mocknet
import (
// "fmt"
"net"
"io"
"sync"
"time"
......@@ -45,11 +45,12 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
}
func (l *link) newStreamPair() (*stream, *stream) {
a, b := net.Pipe()
ra, wb := io.Pipe()
rb, wa := io.Pipe()
s1 := NewStream(a)
s2 := NewStream(b)
return s1, s2
sa := NewStream(wa, ra)
sb := NewStream(wb, rb)
return sa, sb
}
func (l *link) Networks() []inet.Network {
......
......@@ -2,38 +2,52 @@ package mocknet
import (
"bytes"
"errors"
"io"
"net"
"time"
process "github.com/jbenet/goprocess"
inet "github.com/libp2p/go-libp2p-net"
protocol "github.com/libp2p/go-libp2p-protocol"
)
// stream implements inet.Stream
type stream struct {
Pipe net.Conn
write *io.PipeWriter
read *io.PipeReader
conn *conn
toDeliver chan *transportObject
proc process.Process
control chan int
state int
closed chan struct{}
protocol protocol.ID
}
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")
const (
stateOpen = iota
stateClose
stateReset
)
type transportObject struct {
msg []byte
arrivalTime time.Time
}
func NewStream(p net.Conn) *stream {
func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream {
s := &stream{
Pipe: p,
read: r,
write: w,
control: make(chan int),
closed: make(chan struct{}),
toDeliver: make(chan *transportObject),
}
s.proc = process.WithTeardown(s.teardown)
s.proc.Go(s.transport)
go s.transport()
return s
}
......@@ -43,8 +57,13 @@ func (s *stream) Write(p []byte) (n int, err error) {
delay := l.GetLatency() + l.RateLimit(len(p))
t := time.Now().Add(delay)
select {
case <-s.proc.Closing(): // bail out if we're closing.
return 0, io.ErrClosedPipe
case <-s.closed: // bail out if we're closing.
switch s.state {
case stateReset:
return 0, ErrReset
case stateClose:
return 0, ErrClosed
}
case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
}
return len(p), nil
......@@ -59,21 +78,46 @@ func (s *stream) SetProtocol(proto protocol.ID) {
}
func (s *stream) Close() error {
return s.proc.Close()
select {
case s.control <- stateClose:
case <-s.closed:
}
<-s.closed
if s.state == stateReset {
return nil
} else {
return ErrClosed
}
}
// teardown shuts down the stream. it is called by s.proc.Close()
// after all the children of this s.proc (i.e. transport's proc)
// are done.
func (s *stream) teardown() error {
// at this point, no streams are writing.
func (s *stream) Reset() error {
// Cancel any pending reads.
s.write.Close()
select {
case s.control <- stateReset:
case <-s.closed:
}
<-s.closed
if s.state == stateReset {
return nil
} else {
return ErrClosed
}
}
func (s *stream) teardown() {
s.write.Close()
// at this point, no streams are writing.
s.conn.removeStream(s)
s.Pipe.Close()
// Mark as closed.
close(s.closed)
s.conn.net.notifyAll(func(n inet.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
return nil
}
func (s *stream) Conn() inet.Conn {
......@@ -81,33 +125,44 @@ func (s *stream) Conn() inet.Conn {
}
func (s *stream) SetDeadline(t time.Time) error {
return s.Pipe.SetDeadline(t)
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) SetReadDeadline(t time.Time) error {
return s.Pipe.SetReadDeadline(t)
func (p *stream) SetReadDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return s.Pipe.SetWriteDeadline(t)
func (p *stream) SetWriteDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}
func (s *stream) Read(b []byte) (int, error) {
return s.Pipe.Read(b)
return s.read.Read(b)
}
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
func (s *stream) transport(proc process.Process) {
func (s *stream) transport() {
defer s.teardown()
bufsize := 256
buf := new(bytes.Buffer)
ticker := time.NewTicker(time.Millisecond * 4)
timer := time.NewTimer(0)
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
// cleanup
defer timer.Stop()
// writeBuf writes the contents of buf through to the s.Writer.
// done only when arrival time makes sense.
drainBuf := func() {
if buf.Len() > 0 {
_, err := s.Pipe.Write(buf.Bytes())
_, err := s.write.Write(buf.Bytes())
if err != nil {
return
}
......@@ -121,44 +176,63 @@ func (s *stream) transport(proc process.Process) {
deliverOrWait := func(o *transportObject) {
buffered := len(o.msg) + buf.Len()
now := time.Now()
if now.Before(o.arrivalTime) {
if buffered < bufsize {
buf.Write(o.msg)
return
// Yes, we can end up extending a timer multiple times if we
// keep on making small writes but that shouldn't be too much of an
// issue. Fixing that would be painful.
if !timer.Stop() {
// FIXME: So, we *shouldn't* need to do this but we hang
// here if we don't... Go bug?
select {
case <-timer.C:
default:
}
// we do not buffer + return here, instead hanging the
// call (i.e. not accepting any more transportObjects)
// so that we apply back-pressure to the sender.
// this sleep should wake up same time as ticker.
time.Sleep(o.arrivalTime.Sub(now))
}
delay := o.arrivalTime.Sub(time.Now())
if delay >= 0 {
timer.Reset(delay)
} else {
timer.Reset(0)
}
// ok, we waited our due time. now rite the buf + msg.
// drainBuf first, before we write this message.
drainBuf()
// write this message.
_, err := s.Pipe.Write(o.msg)
if err != nil {
log.Error("mock_stream", err)
if buffered >= bufsize {
select {
case <-timer.C:
case s.state = <-s.control:
return
}
drainBuf()
// write this message.
_, err := s.write.Write(o.msg)
if err != nil {
log.Error("mock_stream", err)
}
} else {
buf.Write(o.msg)
}
}
for {
select {
case <-proc.Closing():
return // bail out of here.
switch s.state {
case stateClose:
drainBuf()
return
case stateReset:
s.read.CloseWithError(ErrReset)
return
default:
panic("invalid state")
case stateOpen:
}
select {
case s.state = <-s.control:
continue
case o, ok := <-s.toDeliver:
if !ok {
return
}
deliverOrWait(o)
case <-ticker.C: // ok, due to write it out.
case <-timer.C: // ok, due to write it out.
drainBuf()
}
}
......
......@@ -33,33 +33,42 @@ func NewPingService(h host.Host) *PingService {
}
func (p *PingService) PingHandler(s inet.Stream) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
buf := make([]byte, PingSize)
errCh := make(chan error, 1)
defer close(errCh)
timer := time.NewTimer(pingTimeout)
defer timer.Stop()
go func() {
select {
case <-timer.C:
case <-ctx.Done():
log.Debug("ping timeout")
s.Reset()
case err, ok := <-errCh:
if ok {
log.Debug(err)
if err == io.EOF {
s.Close()
} else {
s.Reset()
}
} else {
log.Error("ping loop failed without error")
}
}
s.Close()
}()
for {
_, err := io.ReadFull(s, buf)
if err != nil {
log.Debug(err)
errCh <- err
return
}
_, err = s.Write(buf)
if err != nil {
log.Debug(err)
errCh <- err
return
}
......@@ -84,6 +93,7 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio
default:
t, err := ping(s)
if err != nil {
s.Reset()
log.Debugf("ping error: %s", err)
return
}
......
......@@ -31,8 +31,12 @@ func EchoStreamHandler(stream inet.Stream) {
c := stream.Conn()
log.Debugf("%s echoing %s", c.LocalPeer(), c.RemotePeer())
go func() {
defer stream.Close()
io.Copy(stream, stream)
_, err := io.Copy(stream, stream)
if err == nil {
stream.Close()
} else {
stream.Reset()
}
}()
}
......
......@@ -109,9 +109,9 @@
"version": "0.0.0"
},
{
"hash": "QmVNPgPmEG4QKaDKkxMPKY34Z53n8efzv1sEh4NTsdhto7",
"hash": "QmTMNkpso2WRMevXC8ZxgyBhJvoEHvk24SNeUr9Mf9UM1a",
"name": "go-peerstream",
"version": "1.7.0"
"version": "2.0.2"
},
{
"author": "whyrusleeping",
......@@ -151,9 +151,9 @@
},
{
"author": "whyrusleeping",
"hash": "QmdQcv14hCd41WEzNA4avJohJR5sdPqVgFtXZtDz6MTCKx",
"hash": "QmbrUTiVDSK3WGePN18qVjpGYmvXQt6YVPyyGoXWx593uq",
"name": "go-tcp-transport",
"version": "1.2.2"
"version": "1.2.3"
},
{
"author": "whyrusleeping",
......@@ -181,21 +181,21 @@
},
{
"author": "whyrusleeping",
"hash": "QmX49btJy5UQuHYmWxrNTmpcnUE5a4upGS6xPYH7mPE46D",
"hash": "QmTi4629yyHJ8qW9sXFjvxJpYcN499tHhERLZYdUqwRU9i",
"name": "go-libp2p-conn",
"version": "1.6.12"
"version": "1.6.13"
},
{
"author": "whyrusleeping",
"hash": "QmahYsGWry85Y7WUe2SX5G4JkH2zifEQAUtJVLZ24aC9DF",
"hash": "QmNa31VPzC561NWwRsJLE7nGYZYuuD2QfpK2b1q9BK54J1",
"name": "go-libp2p-net",
"version": "1.6.12"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmVjRAPfRtResCMCE4eBqr4Beoa6A89P1YweG9wUS6RqUL",
"hash": "QmQbh3Rb7KM37As3vkHYnEFnzkVXNCP8EYGtHz6g2fXk14",
"name": "go-libp2p-metrics",
"version": "1.6.10"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
......@@ -205,15 +205,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmUwW8jMQDxXhLD2j4EfWqLEMX3MsvyWcWGvJPVDh1aTmu",
"hash": "QmaSxYRuMq4pkpBBG2CYaRrPx2z7NmMVEs34b9g61biQA6",
"name": "go-libp2p-host",
"version": "1.3.19"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmQUmDr1DMDDy6KMSsJuyV9nVD7dJZ9iWxXESQWPvte2NP",
"hash": "QmW97nvnsknsoN8NUz8CUF5hVVaicLgNaEb6EZMk3oB943",
"name": "go-libp2p-swarm",
"version": "1.7.7"
"version": "2.0.2"
},
{
"author": "whyrusleeping",
......@@ -223,15 +223,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmQ1bJEsmdEiGfTQRoj6CsshWmAKduAEDEbwzbvk5QT5Ui",
"hash": "QmP4cEjmvf8tC6ykxKXrvmYLo8vqtGsgduMatjbAKnBzv8",
"name": "go-libp2p-netutil",
"version": "0.2.25"
"version": "0.3.1"
},
{
"author": "whyrusleeping",
"hash": "QmSmgF5Nnmf1Ygkv96xCmUdPk4QPx3JotTA7sqwXpoxCV2",
"hash": "QmPZRCaYeNLMo5GfcRS2rv9ZxVuXXt6MFg9dWLmgsdXKCw",
"name": "go-libp2p-blankhost",
"version": "0.1.18"
"version": "0.2.0"
},
{
"author": "whyrusleeping",
......@@ -241,9 +241,9 @@
},
{
"author": "whyrusleeping",
"hash": "Qmbn7RYyWzBVXiUp9jZ1dA4VADHy9DtS7iZLwfhEUQvm3U",
"hash": "QmfTJ3UpS5ycNX7uQvPUSSRjGxk9EhUG7SyCstX6tCoNXS",
"name": "go-smux-yamux",
"version": "1.2.0"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
......@@ -253,15 +253,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmRVYfZ7tWNHPBzWiG6KWGzvT2hcGems8srihsQE29x1U5",
"hash": "QmVniQJkdzLZaZwzwMdd3dJTvWiJ1DQEkreVy6hs6h7Vk5",
"name": "go-smux-multistream",
"version": "1.5.5"
"version": "2.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmTuwwqGf4NH2Jj3opKtaKx45ge4RiXSCtvUkb7a4gk2ua",
"hash": "QmYUpfXEBqLdtiSUDzzc8hLfcELPHiPtANF12EpEX1WCVB",
"name": "go-libp2p-connmgr",
"version": "0.1.3"
"version": "0.2.0"
},
{
"author": "whyrusleeping",
......@@ -271,9 +271,9 @@
},
{
"author": "vyzo",
"hash": "QmYkTCcfrPdR5QMasnhh3FVRVNEKzH3YsvuBPpB4YPgwWC",
"hash": "QmVXc7cgEkxWDELn9sGV9r1HbqfQR9YCUmbsrkp1rcXSjn",
"name": "go-libp2p-circuit",
"version": "1.1.8"
"version": "2.0.1"
},
{
"author": "lgierth",
......@@ -287,6 +287,6 @@
"license": "MIT",
"name": "go-libp2p",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "4.5.5"
"version": "5.0.0"
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment