mock_stream.go 4.97 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package mocknet

import (
Karthik Bala's avatar
Karthik Bala committed
4
	"bytes"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"io"
7
	"net"
Karthik Bala's avatar
Karthik Bala committed
8
9
	"time"

Jeromy's avatar
Jeromy committed
10
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
11
	protocol "github.com/libp2p/go-libp2p-protocol"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
)

// stream implements inet.Stream
type stream struct {
Steven Allen's avatar
Steven Allen committed
16
17
	write     *io.PipeWriter
	read      *io.PipeReader
Karthik Bala's avatar
Karthik Bala committed
18
19
	conn      *conn
	toDeliver chan *transportObject
20
21
22
23
24

	reset  chan struct{}
	close  chan struct{}
	closed chan struct{}

25
	writeErr error
26

27
	protocol protocol.ID
Karthik Bala's avatar
Karthik Bala committed
28
29
}

Steven Allen's avatar
Steven Allen committed
30
31
32
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")

Karthik Bala's avatar
Karthik Bala committed
33
34
35
36
37
type transportObject struct {
	msg         []byte
	arrivalTime time.Time
}

Steven Allen's avatar
Steven Allen committed
38
func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream {
Karthik Bala's avatar
Karthik Bala committed
39
	s := &stream{
Steven Allen's avatar
Steven Allen committed
40
41
		read:      r,
		write:     w,
42
43
		reset:     make(chan struct{}, 1),
		close:     make(chan struct{}, 1),
Steven Allen's avatar
Steven Allen committed
44
		closed:    make(chan struct{}),
Karthik Bala's avatar
Karthik Bala committed
45
46
47
		toDeliver: make(chan *transportObject),
	}

Steven Allen's avatar
Steven Allen committed
48
	go s.transport()
Karthik Bala's avatar
Karthik Bala committed
49
50
51
52
53
54
55
56
57
	return s
}

//  How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
	l := s.conn.link
	delay := l.GetLatency() + l.RateLimit(len(p))
	t := time.Now().Add(delay)
	select {
Steven Allen's avatar
Steven Allen committed
58
	case <-s.closed: // bail out if we're closing.
59
		return 0, s.writeErr
Karthik Bala's avatar
Karthik Bala committed
60
61
62
	case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
	}
	return len(p), nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
64
}

65
func (s *stream) Protocol() protocol.ID {
66
67
68
	return s.protocol
}

69
func (s *stream) SetProtocol(proto protocol.ID) {
70
71
72
	s.protocol = proto
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
func (s *stream) Close() error {
Steven Allen's avatar
Steven Allen committed
74
	select {
75
76
	case s.close <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
77
78
	}
	<-s.closed
79
80
	if s.writeErr != ErrClosed {
		return s.writeErr
Steven Allen's avatar
Steven Allen committed
81
	}
82
	return nil
Karthik Bala's avatar
Karthik Bala committed
83
84
}

Steven Allen's avatar
Steven Allen committed
85
func (s *stream) Reset() error {
86
87
88
	// Cancel any pending reads/writes with an error.
	s.write.CloseWithError(ErrReset)
	s.read.CloseWithError(ErrReset)
Karthik Bala's avatar
Karthik Bala committed
89

Steven Allen's avatar
Steven Allen committed
90
	select {
91
92
	case s.reset <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
93
94
	}
	<-s.closed
95
96

	// No meaningful error case here.
97
	return nil
Steven Allen's avatar
Steven Allen committed
98
99
100
101
}

func (s *stream) teardown() {
	// at this point, no streams are writing.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
102
	s.conn.removeStream(s)
Steven Allen's avatar
Steven Allen committed
103
104
105
106

	// Mark as closed.
	close(s.closed)

107
108
109
	s.conn.net.notifyAll(func(n inet.Notifiee) {
		n.ClosedStream(s.conn.net, s)
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
111
112
113
114
}

func (s *stream) Conn() inet.Conn {
	return s.conn
}
Karthik Bala's avatar
Karthik Bala committed
115

116
func (s *stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
117
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
118
119
}

120
func (s *stream) SetReadDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
121
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
122
123
}

124
func (s *stream) SetWriteDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
125
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
126
127
128
}

func (s *stream) Read(b []byte) (int, error) {
Steven Allen's avatar
Steven Allen committed
129
	return s.read.Read(b)
130
131
}

Karthik Bala's avatar
Karthik Bala committed
132
133
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
Steven Allen's avatar
Steven Allen committed
134
135
136
func (s *stream) transport() {
	defer s.teardown()

Karthik Bala's avatar
Karthik Bala committed
137
138
	bufsize := 256
	buf := new(bytes.Buffer)
Steven Allen's avatar
Steven Allen committed
139
140
141
142
143
144
145
146
147
148
	timer := time.NewTimer(0)
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}

	// cleanup
	defer timer.Stop()
Karthik Bala's avatar
Karthik Bala committed
149
150
151

	// writeBuf writes the contents of buf through to the s.Writer.
	// done only when arrival time makes sense.
152
	drainBuf := func() error {
Karthik Bala's avatar
Karthik Bala committed
153
		if buf.Len() > 0 {
Steven Allen's avatar
Steven Allen committed
154
			_, err := s.write.Write(buf.Bytes())
Karthik Bala's avatar
Karthik Bala committed
155
			if err != nil {
156
				return err
Karthik Bala's avatar
Karthik Bala committed
157
158
159
			}
			buf.Reset()
		}
160
		return nil
Karthik Bala's avatar
Karthik Bala committed
161
162
163
164
165
	}

	// deliverOrWait is a helper func that processes
	// an incoming packet. it waits until the arrival time,
	// and then writes things out.
166
	deliverOrWait := func(o *transportObject) error {
Karthik Bala's avatar
Karthik Bala committed
167
168
		buffered := len(o.msg) + buf.Len()

Steven Allen's avatar
Steven Allen committed
169
170
171
172
173
174
175
176
177
		// Yes, we can end up extending a timer multiple times if we
		// keep on making small writes but that shouldn't be too much of an
		// issue. Fixing that would be painful.
		if !timer.Stop() {
			// FIXME: So, we *shouldn't* need to do this but we hang
			// here if we don't... Go bug?
			select {
			case <-timer.C:
			default:
Karthik Bala's avatar
Karthik Bala committed
178
			}
Steven Allen's avatar
Steven Allen committed
179
180
181
182
183
184
		}
		delay := o.arrivalTime.Sub(time.Now())
		if delay >= 0 {
			timer.Reset(delay)
		} else {
			timer.Reset(0)
Karthik Bala's avatar
Karthik Bala committed
185
186
		}

Steven Allen's avatar
Steven Allen committed
187
188
189
		if buffered >= bufsize {
			select {
			case <-timer.C:
190
			case <-s.reset:
191
192
193
194
195
196
197
198
				select {
				case s.reset <- struct{}{}:
				default:
				}
				return ErrReset
			}
			if err := drainBuf(); err != nil {
				return err
Steven Allen's avatar
Steven Allen committed
199
200
201
202
			}
			// write this message.
			_, err := s.write.Write(o.msg)
			if err != nil {
203
				return err
Steven Allen's avatar
Steven Allen committed
204
205
206
			}
		} else {
			buf.Write(o.msg)
Karthik Bala's avatar
Karthik Bala committed
207
		}
208
		return nil
Karthik Bala's avatar
Karthik Bala committed
209
210
211
	}

	for {
212
213
214
		// Reset takes precedent.
		select {
		case <-s.reset:
215
			s.writeErr = ErrReset
Steven Allen's avatar
Steven Allen committed
216
217
218
			return
		default:
		}
Karthik Bala's avatar
Karthik Bala committed
219

Steven Allen's avatar
Steven Allen committed
220
		select {
221
		case <-s.reset:
222
			s.writeErr = ErrReset
223
224
			return
		case <-s.close:
225
226
227
228
229
230
231
232
			if err := drainBuf(); err != nil {
				s.resetWith(err)
				return
			}
			s.writeErr = s.write.Close()
			if s.writeErr == nil {
				s.writeErr = ErrClosed
			}
233
234
			return
		case o := <-s.toDeliver:
235
236
237
238
			if err := deliverOrWait(o); err != nil {
				s.resetWith(err)
				return
			}
Steven Allen's avatar
Steven Allen committed
239
		case <-timer.C: // ok, due to write it out.
240
241
242
243
			if err := drainBuf(); err != nil {
				s.resetWith(err)
				return
			}
Karthik Bala's avatar
Karthik Bala committed
244
245
246
		}
	}
}
247
248
249
250
251
252

func (s *stream) resetWith(err error) {
	s.write.CloseWithError(err)
	s.read.CloseWithError(err)
	s.writeErr = err
}