Commit 860d2784 authored by Steven Allen's avatar Steven Allen
Browse files

make sure to not drop writes on close

Before, on close, we:

1. Weren't completing the write.
2. Flushing the buffer without waiting the latency delay.

This fixes that by using two separate channels for close/reset and ignoring the
close channel in deliverOrWait.
parent 461faf4a
...@@ -17,9 +17,12 @@ type stream struct { ...@@ -17,9 +17,12 @@ type stream struct {
read *io.PipeReader read *io.PipeReader
conn *conn conn *conn
toDeliver chan *transportObject toDeliver chan *transportObject
control chan int
state int reset chan struct{}
closed chan struct{} close chan struct{}
closed chan struct{}
state error
protocol protocol.ID protocol protocol.ID
} }
...@@ -27,12 +30,6 @@ type stream struct { ...@@ -27,12 +30,6 @@ type stream struct {
var ErrReset error = errors.New("stream reset") var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed") var ErrClosed error = errors.New("stream closed")
const (
stateOpen = iota
stateClose
stateReset
)
type transportObject struct { type transportObject struct {
msg []byte msg []byte
arrivalTime time.Time arrivalTime time.Time
...@@ -42,7 +39,8 @@ func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream { ...@@ -42,7 +39,8 @@ func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream {
s := &stream{ s := &stream{
read: r, read: r,
write: w, write: w,
control: make(chan int), reset: make(chan struct{}, 1),
close: make(chan struct{}, 1),
closed: make(chan struct{}), closed: make(chan struct{}),
toDeliver: make(chan *transportObject), toDeliver: make(chan *transportObject),
} }
...@@ -58,12 +56,7 @@ func (s *stream) Write(p []byte) (n int, err error) { ...@@ -58,12 +56,7 @@ func (s *stream) Write(p []byte) (n int, err error) {
t := time.Now().Add(delay) t := time.Now().Add(delay)
select { select {
case <-s.closed: // bail out if we're closing. case <-s.closed: // bail out if we're closing.
switch s.state { return 0, s.state
case stateReset:
return 0, ErrReset
case stateClose:
return 0, ErrClosed
}
case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}: case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
} }
return len(p), nil return len(p), nil
...@@ -79,31 +72,29 @@ func (s *stream) SetProtocol(proto protocol.ID) { ...@@ -79,31 +72,29 @@ func (s *stream) SetProtocol(proto protocol.ID) {
func (s *stream) Close() error { func (s *stream) Close() error {
select { select {
case s.control <- stateClose: case s.close <- struct{}{}:
case <-s.closed: default:
} }
<-s.closed <-s.closed
if s.state == stateReset { if s.state != ErrClosed {
return nil return s.state
} else {
return ErrClosed
} }
return nil
} }
func (s *stream) Reset() error { func (s *stream) Reset() error {
// Cancel any pending reads. // Cancel any pending writes.
s.write.Close() s.write.Close()
select { select {
case s.control <- stateReset: case s.reset <- struct{}{}:
case <-s.closed: default:
} }
<-s.closed <-s.closed
if s.state == stateReset { if s.state != ErrReset {
return nil return s.state
} else {
return ErrClosed
} }
return nil
} }
func (s *stream) teardown() { func (s *stream) teardown() {
...@@ -128,11 +119,11 @@ func (s *stream) SetDeadline(t time.Time) error { ...@@ -128,11 +119,11 @@ func (s *stream) SetDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
} }
func (p *stream) SetReadDeadline(t time.Time) error { func (s *stream) SetReadDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
} }
func (p *stream) SetWriteDeadline(t time.Time) error { func (s *stream) SetWriteDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
} }
...@@ -197,7 +188,8 @@ func (s *stream) transport() { ...@@ -197,7 +188,8 @@ func (s *stream) transport() {
if buffered >= bufsize { if buffered >= bufsize {
select { select {
case <-timer.C: case <-timer.C:
case s.state = <-s.control: case <-s.reset:
s.reset <- struct{}{}
return return
} }
drainBuf() drainBuf()
...@@ -212,25 +204,25 @@ func (s *stream) transport() { ...@@ -212,25 +204,25 @@ func (s *stream) transport() {
} }
for { for {
switch s.state { // Reset takes precedent.
case stateClose: select {
drainBuf() case <-s.reset:
return s.state = ErrReset
case stateReset:
s.read.CloseWithError(ErrReset) s.read.CloseWithError(ErrReset)
return return
default: default:
panic("invalid state")
case stateOpen:
} }
select { select {
case s.state = <-s.control: case <-s.reset:
continue s.state = ErrReset
case o, ok := <-s.toDeliver: s.read.CloseWithError(ErrReset)
if !ok { return
return case <-s.close:
} s.state = ErrClosed
drainBuf()
return
case o := <-s.toDeliver:
deliverOrWait(o) deliverOrWait(o)
case <-timer.C: // ok, due to write it out. case <-timer.C: // ok, due to write it out.
drainBuf() drainBuf()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment