mock_stream.go 5.1 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package mocknet

import (
Karthik Bala's avatar
Karthik Bala committed
4
	"bytes"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"io"
7
	"net"
Karthik Bala's avatar
Karthik Bala committed
8
9
	"time"

Jeromy's avatar
Jeromy committed
10
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
11
	protocol "github.com/libp2p/go-libp2p-protocol"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
)

// stream implements inet.Stream
type stream struct {
Steven Allen's avatar
Steven Allen committed
16
17
	write     *io.PipeWriter
	read      *io.PipeReader
Karthik Bala's avatar
Karthik Bala committed
18
19
	conn      *conn
	toDeliver chan *transportObject
20
21
22
23
24

	reset  chan struct{}
	close  chan struct{}
	closed chan struct{}

25
	writeErr error
26

27
	protocol protocol.ID
28
	stat     inet.Stat
Karthik Bala's avatar
Karthik Bala committed
29
30
}

Steven Allen's avatar
Steven Allen committed
31
32
33
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")

Karthik Bala's avatar
Karthik Bala committed
34
35
36
37
38
type transportObject struct {
	msg         []byte
	arrivalTime time.Time
}

39
func NewStream(w *io.PipeWriter, r *io.PipeReader, dir inet.Direction) *stream {
Karthik Bala's avatar
Karthik Bala committed
40
	s := &stream{
Steven Allen's avatar
Steven Allen committed
41
42
		read:      r,
		write:     w,
43
44
		reset:     make(chan struct{}, 1),
		close:     make(chan struct{}, 1),
Steven Allen's avatar
Steven Allen committed
45
		closed:    make(chan struct{}),
Karthik Bala's avatar
Karthik Bala committed
46
		toDeliver: make(chan *transportObject),
47
		stat:      inet.Stat{Direction: dir},
Karthik Bala's avatar
Karthik Bala committed
48
49
	}

Steven Allen's avatar
Steven Allen committed
50
	go s.transport()
Karthik Bala's avatar
Karthik Bala committed
51
52
53
54
55
56
57
58
59
	return s
}

//  How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
	l := s.conn.link
	delay := l.GetLatency() + l.RateLimit(len(p))
	t := time.Now().Add(delay)
	select {
Steven Allen's avatar
Steven Allen committed
60
	case <-s.closed: // bail out if we're closing.
61
		return 0, s.writeErr
Karthik Bala's avatar
Karthik Bala committed
62
63
64
	case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
	}
	return len(p), nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
65
66
}

67
func (s *stream) Protocol() protocol.ID {
68
69
70
	return s.protocol
}

71
72
73
74
func (s *stream) Stat() inet.Stat {
	return s.stat
}

75
func (s *stream) SetProtocol(proto protocol.ID) {
76
77
78
	s.protocol = proto
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
func (s *stream) Close() error {
Steven Allen's avatar
Steven Allen committed
80
	select {
81
82
	case s.close <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
83
84
	}
	<-s.closed
85
86
	if s.writeErr != ErrClosed {
		return s.writeErr
Steven Allen's avatar
Steven Allen committed
87
	}
88
	return nil
Karthik Bala's avatar
Karthik Bala committed
89
90
}

Steven Allen's avatar
Steven Allen committed
91
func (s *stream) Reset() error {
92
93
94
	// Cancel any pending reads/writes with an error.
	s.write.CloseWithError(ErrReset)
	s.read.CloseWithError(ErrReset)
Karthik Bala's avatar
Karthik Bala committed
95

Steven Allen's avatar
Steven Allen committed
96
	select {
97
98
	case s.reset <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
99
100
	}
	<-s.closed
101
102

	// No meaningful error case here.
103
	return nil
Steven Allen's avatar
Steven Allen committed
104
105
106
107
}

func (s *stream) teardown() {
	// at this point, no streams are writing.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
108
	s.conn.removeStream(s)
Steven Allen's avatar
Steven Allen committed
109
110
111
112

	// Mark as closed.
	close(s.closed)

113
114
115
	s.conn.net.notifyAll(func(n inet.Notifiee) {
		n.ClosedStream(s.conn.net, s)
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
116
117
118
119
120
}

func (s *stream) Conn() inet.Conn {
	return s.conn
}
Karthik Bala's avatar
Karthik Bala committed
121

122
func (s *stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
123
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
124
125
}

126
func (s *stream) SetReadDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
127
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
128
129
}

130
func (s *stream) SetWriteDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
131
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
132
133
134
}

func (s *stream) Read(b []byte) (int, error) {
Steven Allen's avatar
Steven Allen committed
135
	return s.read.Read(b)
136
137
}

Karthik Bala's avatar
Karthik Bala committed
138
139
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
Steven Allen's avatar
Steven Allen committed
140
141
142
func (s *stream) transport() {
	defer s.teardown()

Karthik Bala's avatar
Karthik Bala committed
143
144
	bufsize := 256
	buf := new(bytes.Buffer)
Steven Allen's avatar
Steven Allen committed
145
146
147
148
149
150
151
152
153
154
	timer := time.NewTimer(0)
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}

	// cleanup
	defer timer.Stop()
Karthik Bala's avatar
Karthik Bala committed
155
156
157

	// writeBuf writes the contents of buf through to the s.Writer.
	// done only when arrival time makes sense.
158
	drainBuf := func() error {
Karthik Bala's avatar
Karthik Bala committed
159
		if buf.Len() > 0 {
Steven Allen's avatar
Steven Allen committed
160
			_, err := s.write.Write(buf.Bytes())
Karthik Bala's avatar
Karthik Bala committed
161
			if err != nil {
162
				return err
Karthik Bala's avatar
Karthik Bala committed
163
164
165
			}
			buf.Reset()
		}
166
		return nil
Karthik Bala's avatar
Karthik Bala committed
167
168
169
170
171
	}

	// deliverOrWait is a helper func that processes
	// an incoming packet. it waits until the arrival time,
	// and then writes things out.
172
	deliverOrWait := func(o *transportObject) error {
Karthik Bala's avatar
Karthik Bala committed
173
174
		buffered := len(o.msg) + buf.Len()

Steven Allen's avatar
Steven Allen committed
175
176
177
178
179
180
181
182
183
		// Yes, we can end up extending a timer multiple times if we
		// keep on making small writes but that shouldn't be too much of an
		// issue. Fixing that would be painful.
		if !timer.Stop() {
			// FIXME: So, we *shouldn't* need to do this but we hang
			// here if we don't... Go bug?
			select {
			case <-timer.C:
			default:
Karthik Bala's avatar
Karthik Bala committed
184
			}
Steven Allen's avatar
Steven Allen committed
185
186
187
188
189
190
		}
		delay := o.arrivalTime.Sub(time.Now())
		if delay >= 0 {
			timer.Reset(delay)
		} else {
			timer.Reset(0)
Karthik Bala's avatar
Karthik Bala committed
191
192
		}

Steven Allen's avatar
Steven Allen committed
193
194
195
		if buffered >= bufsize {
			select {
			case <-timer.C:
196
			case <-s.reset:
197
198
199
200
201
202
203
204
				select {
				case s.reset <- struct{}{}:
				default:
				}
				return ErrReset
			}
			if err := drainBuf(); err != nil {
				return err
Steven Allen's avatar
Steven Allen committed
205
206
207
208
			}
			// write this message.
			_, err := s.write.Write(o.msg)
			if err != nil {
209
				return err
Steven Allen's avatar
Steven Allen committed
210
211
212
			}
		} else {
			buf.Write(o.msg)
Karthik Bala's avatar
Karthik Bala committed
213
		}
214
		return nil
Karthik Bala's avatar
Karthik Bala committed
215
216
217
	}

	for {
218
219
220
		// Reset takes precedent.
		select {
		case <-s.reset:
221
			s.writeErr = ErrReset
Steven Allen's avatar
Steven Allen committed
222
223
224
			return
		default:
		}
Karthik Bala's avatar
Karthik Bala committed
225

Steven Allen's avatar
Steven Allen committed
226
		select {
227
		case <-s.reset:
228
			s.writeErr = ErrReset
229
230
			return
		case <-s.close:
231
232
233
234
235
236
237
238
			if err := drainBuf(); err != nil {
				s.resetWith(err)
				return
			}
			s.writeErr = s.write.Close()
			if s.writeErr == nil {
				s.writeErr = ErrClosed
			}
239
240
			return
		case o := <-s.toDeliver:
241
242
243
244
			if err := deliverOrWait(o); err != nil {
				s.resetWith(err)
				return
			}
Steven Allen's avatar
Steven Allen committed
245
		case <-timer.C: // ok, due to write it out.
246
247
248
249
			if err := drainBuf(); err != nil {
				s.resetWith(err)
				return
			}
Karthik Bala's avatar
Karthik Bala committed
250
251
252
		}
	}
}
253
254
255
256
257
258

func (s *stream) resetWith(err error) {
	s.write.CloseWithError(err)
	s.read.CloseWithError(err)
	s.writeErr = err
}