mock_stream.go 4.58 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package mocknet

import (
Karthik Bala's avatar
Karthik Bala committed
4
	"bytes"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"io"
7
	"net"
Karthik Bala's avatar
Karthik Bala committed
8
9
	"time"

Jeromy's avatar
Jeromy committed
10
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
11
	protocol "github.com/libp2p/go-libp2p-protocol"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
)

// stream implements inet.Stream
type stream struct {
Steven Allen's avatar
Steven Allen committed
16
17
	write     *io.PipeWriter
	read      *io.PipeReader
Karthik Bala's avatar
Karthik Bala committed
18
19
	conn      *conn
	toDeliver chan *transportObject
Steven Allen's avatar
Steven Allen committed
20
21
22
	control   chan int
	state     int
	closed    chan struct{}
23

24
	protocol protocol.ID
Karthik Bala's avatar
Karthik Bala committed
25
26
}

Steven Allen's avatar
Steven Allen committed
27
28
29
30
31
32
33
34
35
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")

const (
	stateOpen = iota
	stateClose
	stateReset
)

Karthik Bala's avatar
Karthik Bala committed
36
37
38
39
40
type transportObject struct {
	msg         []byte
	arrivalTime time.Time
}

Steven Allen's avatar
Steven Allen committed
41
func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream {
Karthik Bala's avatar
Karthik Bala committed
42
	s := &stream{
Steven Allen's avatar
Steven Allen committed
43
44
45
46
		read:      r,
		write:     w,
		control:   make(chan int),
		closed:    make(chan struct{}),
Karthik Bala's avatar
Karthik Bala committed
47
48
49
		toDeliver: make(chan *transportObject),
	}

Steven Allen's avatar
Steven Allen committed
50
	go s.transport()
Karthik Bala's avatar
Karthik Bala committed
51
52
53
54
55
56
57
58
59
	return s
}

//  How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
	l := s.conn.link
	delay := l.GetLatency() + l.RateLimit(len(p))
	t := time.Now().Add(delay)
	select {
Steven Allen's avatar
Steven Allen committed
60
61
62
63
64
65
66
	case <-s.closed: // bail out if we're closing.
		switch s.state {
		case stateReset:
			return 0, ErrReset
		case stateClose:
			return 0, ErrClosed
		}
Karthik Bala's avatar
Karthik Bala committed
67
68
69
	case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
	}
	return len(p), nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
71
}

72
func (s *stream) Protocol() protocol.ID {
73
74
75
	return s.protocol
}

76
func (s *stream) SetProtocol(proto protocol.ID) {
77
78
79
	s.protocol = proto
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
func (s *stream) Close() error {
Steven Allen's avatar
Steven Allen committed
81
82
83
84
85
86
87
88
89
90
	select {
	case s.control <- stateClose:
	case <-s.closed:
	}
	<-s.closed
	if s.state == stateReset {
		return nil
	} else {
		return ErrClosed
	}
Karthik Bala's avatar
Karthik Bala committed
91
92
}

Steven Allen's avatar
Steven Allen committed
93
94
95
func (s *stream) Reset() error {
	// Cancel any pending reads.
	s.write.Close()
Karthik Bala's avatar
Karthik Bala committed
96

Steven Allen's avatar
Steven Allen committed
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
	select {
	case s.control <- stateReset:
	case <-s.closed:
	}
	<-s.closed
	if s.state == stateReset {
		return nil
	} else {
		return ErrClosed
	}
}

func (s *stream) teardown() {
	s.write.Close()

	// at this point, no streams are writing.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113
	s.conn.removeStream(s)
Steven Allen's avatar
Steven Allen committed
114
115
116
117

	// Mark as closed.
	close(s.closed)

118
119
120
	s.conn.net.notifyAll(func(n inet.Notifiee) {
		n.ClosedStream(s.conn.net, s)
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
121
122
123
124
125
}

func (s *stream) Conn() inet.Conn {
	return s.conn
}
Karthik Bala's avatar
Karthik Bala committed
126

127
func (s *stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
128
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
129
130
}

Steven Allen's avatar
Steven Allen committed
131
132
func (p *stream) SetReadDeadline(t time.Time) error {
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
133
134
}

Steven Allen's avatar
Steven Allen committed
135
136
func (p *stream) SetWriteDeadline(t time.Time) error {
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
137
138
139
}

func (s *stream) Read(b []byte) (int, error) {
Steven Allen's avatar
Steven Allen committed
140
	return s.read.Read(b)
141
142
}

Karthik Bala's avatar
Karthik Bala committed
143
144
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
Steven Allen's avatar
Steven Allen committed
145
146
147
func (s *stream) transport() {
	defer s.teardown()

Karthik Bala's avatar
Karthik Bala committed
148
149
	bufsize := 256
	buf := new(bytes.Buffer)
Steven Allen's avatar
Steven Allen committed
150
151
152
153
154
155
156
157
158
159
	timer := time.NewTimer(0)
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}

	// cleanup
	defer timer.Stop()
Karthik Bala's avatar
Karthik Bala committed
160
161
162
163
164

	// writeBuf writes the contents of buf through to the s.Writer.
	// done only when arrival time makes sense.
	drainBuf := func() {
		if buf.Len() > 0 {
Steven Allen's avatar
Steven Allen committed
165
			_, err := s.write.Write(buf.Bytes())
Karthik Bala's avatar
Karthik Bala committed
166
167
168
169
170
171
172
173
174
175
176
177
178
			if err != nil {
				return
			}
			buf.Reset()
		}
	}

	// deliverOrWait is a helper func that processes
	// an incoming packet. it waits until the arrival time,
	// and then writes things out.
	deliverOrWait := func(o *transportObject) {
		buffered := len(o.msg) + buf.Len()

Steven Allen's avatar
Steven Allen committed
179
180
181
182
183
184
185
186
187
		// Yes, we can end up extending a timer multiple times if we
		// keep on making small writes but that shouldn't be too much of an
		// issue. Fixing that would be painful.
		if !timer.Stop() {
			// FIXME: So, we *shouldn't* need to do this but we hang
			// here if we don't... Go bug?
			select {
			case <-timer.C:
			default:
Karthik Bala's avatar
Karthik Bala committed
188
			}
Steven Allen's avatar
Steven Allen committed
189
190
191
192
193
194
		}
		delay := o.arrivalTime.Sub(time.Now())
		if delay >= 0 {
			timer.Reset(delay)
		} else {
			timer.Reset(0)
Karthik Bala's avatar
Karthik Bala committed
195
196
		}

Steven Allen's avatar
Steven Allen committed
197
198
199
200
201
202
203
204
205
206
207
208
209
210
		if buffered >= bufsize {
			select {
			case <-timer.C:
			case s.state = <-s.control:
				return
			}
			drainBuf()
			// write this message.
			_, err := s.write.Write(o.msg)
			if err != nil {
				log.Error("mock_stream", err)
			}
		} else {
			buf.Write(o.msg)
Karthik Bala's avatar
Karthik Bala committed
211
212
213
214
		}
	}

	for {
Steven Allen's avatar
Steven Allen committed
215
216
217
218
219
220
221
222
223
224
225
		switch s.state {
		case stateClose:
			drainBuf()
			return
		case stateReset:
			s.read.CloseWithError(ErrReset)
			return
		default:
			panic("invalid state")
		case stateOpen:
		}
Karthik Bala's avatar
Karthik Bala committed
226

Steven Allen's avatar
Steven Allen committed
227
228
229
		select {
		case s.state = <-s.control:
			continue
Karthik Bala's avatar
Karthik Bala committed
230
231
232
233
234
		case o, ok := <-s.toDeliver:
			if !ok {
				return
			}
			deliverOrWait(o)
Steven Allen's avatar
Steven Allen committed
235
		case <-timer.C: // ok, due to write it out.
Karthik Bala's avatar
Karthik Bala committed
236
237
238
239
			drainBuf()
		}
	}
}