mock_stream.go 5.16 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package mocknet

import (
Karthik Bala's avatar
Karthik Bala committed
4
	"bytes"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"io"
7
	"net"
Karthik Bala's avatar
Karthik Bala committed
8
9
	"time"

Jeromy's avatar
Jeromy committed
10
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
11
	protocol "github.com/libp2p/go-libp2p-protocol"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
)

// stream implements inet.Stream
type stream struct {
Steven Allen's avatar
Steven Allen committed
16
17
	write     *io.PipeWriter
	read      *io.PipeReader
Karthik Bala's avatar
Karthik Bala committed
18
19
	conn      *conn
	toDeliver chan *transportObject
20
21
22
23
24

	reset  chan struct{}
	close  chan struct{}
	closed chan struct{}

25
	writeErr error
26

27
	protocol protocol.ID
28
	stat     inet.Stat
Karthik Bala's avatar
Karthik Bala committed
29
30
}

Steven Allen's avatar
Steven Allen committed
31
32
33
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")

Karthik Bala's avatar
Karthik Bala committed
34
35
36
37
38
type transportObject struct {
	msg         []byte
	arrivalTime time.Time
}

39
func NewStream(w *io.PipeWriter, r *io.PipeReader, dir inet.Direction) *stream {
Karthik Bala's avatar
Karthik Bala committed
40
	s := &stream{
Steven Allen's avatar
Steven Allen committed
41
42
		read:      r,
		write:     w,
43
44
		reset:     make(chan struct{}, 1),
		close:     make(chan struct{}, 1),
Steven Allen's avatar
Steven Allen committed
45
		closed:    make(chan struct{}),
Karthik Bala's avatar
Karthik Bala committed
46
		toDeliver: make(chan *transportObject),
47
		stat:      inet.Stat{Direction: dir},
Karthik Bala's avatar
Karthik Bala committed
48
49
	}

Steven Allen's avatar
Steven Allen committed
50
	go s.transport()
Karthik Bala's avatar
Karthik Bala committed
51
52
53
54
55
56
57
58
	return s
}

//  How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
	l := s.conn.link
	delay := l.GetLatency() + l.RateLimit(len(p))
	t := time.Now().Add(delay)
Steven Allen's avatar
Steven Allen committed
59
60
61
62
63

	// Copy it.
	cpy := make([]byte, len(p))
	copy(cpy, p)

Karthik Bala's avatar
Karthik Bala committed
64
	select {
Steven Allen's avatar
Steven Allen committed
65
	case <-s.closed: // bail out if we're closing.
66
		return 0, s.writeErr
Steven Allen's avatar
Steven Allen committed
67
	case s.toDeliver <- &transportObject{msg: cpy, arrivalTime: t}:
Karthik Bala's avatar
Karthik Bala committed
68
69
	}
	return len(p), nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
71
}

72
func (s *stream) Protocol() protocol.ID {
73
74
75
	return s.protocol
}

76
77
78
79
func (s *stream) Stat() inet.Stat {
	return s.stat
}

80
func (s *stream) SetProtocol(proto protocol.ID) {
81
82
83
	s.protocol = proto
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
84
func (s *stream) Close() error {
Steven Allen's avatar
Steven Allen committed
85
	select {
86
87
	case s.close <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
88
89
	}
	<-s.closed
90
91
	if s.writeErr != ErrClosed {
		return s.writeErr
Steven Allen's avatar
Steven Allen committed
92
	}
93
	return nil
Karthik Bala's avatar
Karthik Bala committed
94
95
}

Steven Allen's avatar
Steven Allen committed
96
func (s *stream) Reset() error {
97
98
99
	// Cancel any pending reads/writes with an error.
	s.write.CloseWithError(ErrReset)
	s.read.CloseWithError(ErrReset)
Karthik Bala's avatar
Karthik Bala committed
100

Steven Allen's avatar
Steven Allen committed
101
	select {
102
103
	case s.reset <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
104
105
	}
	<-s.closed
106
107

	// No meaningful error case here.
108
	return nil
Steven Allen's avatar
Steven Allen committed
109
110
111
112
}

func (s *stream) teardown() {
	// at this point, no streams are writing.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113
	s.conn.removeStream(s)
Steven Allen's avatar
Steven Allen committed
114
115
116
117

	// Mark as closed.
	close(s.closed)

118
119
120
	s.conn.net.notifyAll(func(n inet.Notifiee) {
		n.ClosedStream(s.conn.net, s)
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121
122
123
124
125
}

func (s *stream) Conn() inet.Conn {
	return s.conn
}
Karthik Bala's avatar
Karthik Bala committed
126

127
func (s *stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
128
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
129
130
}

131
func (s *stream) SetReadDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
132
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
133
134
}

135
func (s *stream) SetWriteDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
136
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
137
138
139
}

func (s *stream) Read(b []byte) (int, error) {
Steven Allen's avatar
Steven Allen committed
140
	return s.read.Read(b)
141
142
}

Karthik Bala's avatar
Karthik Bala committed
143
144
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
Steven Allen's avatar
Steven Allen committed
145
146
147
func (s *stream) transport() {
	defer s.teardown()

Karthik Bala's avatar
Karthik Bala committed
148
149
	bufsize := 256
	buf := new(bytes.Buffer)
Steven Allen's avatar
Steven Allen committed
150
151
152
153
154
155
156
157
158
159
	timer := time.NewTimer(0)
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}

	// cleanup
	defer timer.Stop()
Karthik Bala's avatar
Karthik Bala committed
160
161
162

	// writeBuf writes the contents of buf through to the s.Writer.
	// done only when arrival time makes sense.
163
	drainBuf := func() error {
Karthik Bala's avatar
Karthik Bala committed
164
		if buf.Len() > 0 {
Steven Allen's avatar
Steven Allen committed
165
			_, err := s.write.Write(buf.Bytes())
Karthik Bala's avatar
Karthik Bala committed
166
			if err != nil {
167
				return err
Karthik Bala's avatar
Karthik Bala committed
168
169
170
			}
			buf.Reset()
		}
171
		return nil
Karthik Bala's avatar
Karthik Bala committed
172
173
174
175
176
	}

	// deliverOrWait is a helper func that processes
	// an incoming packet. it waits until the arrival time,
	// and then writes things out.
177
	deliverOrWait := func(o *transportObject) error {
Karthik Bala's avatar
Karthik Bala committed
178
179
		buffered := len(o.msg) + buf.Len()

Steven Allen's avatar
Steven Allen committed
180
181
182
183
184
185
186
187
188
		// Yes, we can end up extending a timer multiple times if we
		// keep on making small writes but that shouldn't be too much of an
		// issue. Fixing that would be painful.
		if !timer.Stop() {
			// FIXME: So, we *shouldn't* need to do this but we hang
			// here if we don't... Go bug?
			select {
			case <-timer.C:
			default:
Karthik Bala's avatar
Karthik Bala committed
189
			}
Steven Allen's avatar
Steven Allen committed
190
191
192
193
194
195
		}
		delay := o.arrivalTime.Sub(time.Now())
		if delay >= 0 {
			timer.Reset(delay)
		} else {
			timer.Reset(0)
Karthik Bala's avatar
Karthik Bala committed
196
197
		}

Steven Allen's avatar
Steven Allen committed
198
199
200
		if buffered >= bufsize {
			select {
			case <-timer.C:
201
			case <-s.reset:
202
203
204
205
206
207
208
209
				select {
				case s.reset <- struct{}{}:
				default:
				}
				return ErrReset
			}
			if err := drainBuf(); err != nil {
				return err
Steven Allen's avatar
Steven Allen committed
210
211
212
213
			}
			// write this message.
			_, err := s.write.Write(o.msg)
			if err != nil {
214
				return err
Steven Allen's avatar
Steven Allen committed
215
216
217
			}
		} else {
			buf.Write(o.msg)
Karthik Bala's avatar
Karthik Bala committed
218
		}
219
		return nil
Karthik Bala's avatar
Karthik Bala committed
220
221
222
	}

	for {
223
224
225
		// Reset takes precedent.
		select {
		case <-s.reset:
226
			s.writeErr = ErrReset
Steven Allen's avatar
Steven Allen committed
227
228
229
			return
		default:
		}
Karthik Bala's avatar
Karthik Bala committed
230

Steven Allen's avatar
Steven Allen committed
231
		select {
232
		case <-s.reset:
233
			s.writeErr = ErrReset
234
235
			return
		case <-s.close:
236
237
238
239
240
241
242
243
			if err := drainBuf(); err != nil {
				s.resetWith(err)
				return
			}
			s.writeErr = s.write.Close()
			if s.writeErr == nil {
				s.writeErr = ErrClosed
			}
244
245
			return
		case o := <-s.toDeliver:
246
247
248
249
			if err := deliverOrWait(o); err != nil {
				s.resetWith(err)
				return
			}
Steven Allen's avatar
Steven Allen committed
250
		case <-timer.C: // ok, due to write it out.
251
252
253
254
			if err := drainBuf(); err != nil {
				s.resetWith(err)
				return
			}
Karthik Bala's avatar
Karthik Bala committed
255
256
257
		}
	}
}
258
259
260
261
262
263

func (s *stream) resetWith(err error) {
	s.write.CloseWithError(err)
	s.read.CloseWithError(err)
	s.writeErr = err
}