"...board/git@web.lueluesay.top:root/arm-trusted-firmware.git" did not exist on "7a8ef89f97720fcded620e8a18ac831281392239"
mock_stream.go 4.51 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package mocknet

import (
Karthik Bala's avatar
Karthik Bala committed
4
	"bytes"
Steven Allen's avatar
Steven Allen committed
5
	"errors"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
	"io"
7
	"net"
Karthik Bala's avatar
Karthik Bala committed
8
9
	"time"

Jeromy's avatar
Jeromy committed
10
	inet "github.com/libp2p/go-libp2p-net"
Jeromy's avatar
Jeromy committed
11
	protocol "github.com/libp2p/go-libp2p-protocol"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
)

// stream implements inet.Stream
type stream struct {
Steven Allen's avatar
Steven Allen committed
16
17
	write     *io.PipeWriter
	read      *io.PipeReader
Karthik Bala's avatar
Karthik Bala committed
18
19
	conn      *conn
	toDeliver chan *transportObject
20
21
22
23
24
25

	reset  chan struct{}
	close  chan struct{}
	closed chan struct{}

	state error
26

27
	protocol protocol.ID
Karthik Bala's avatar
Karthik Bala committed
28
29
}

Steven Allen's avatar
Steven Allen committed
30
31
32
var ErrReset error = errors.New("stream reset")
var ErrClosed error = errors.New("stream closed")

Karthik Bala's avatar
Karthik Bala committed
33
34
35
36
37
type transportObject struct {
	msg         []byte
	arrivalTime time.Time
}

Steven Allen's avatar
Steven Allen committed
38
func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream {
Karthik Bala's avatar
Karthik Bala committed
39
	s := &stream{
Steven Allen's avatar
Steven Allen committed
40
41
		read:      r,
		write:     w,
42
43
		reset:     make(chan struct{}, 1),
		close:     make(chan struct{}, 1),
Steven Allen's avatar
Steven Allen committed
44
		closed:    make(chan struct{}),
Karthik Bala's avatar
Karthik Bala committed
45
46
47
		toDeliver: make(chan *transportObject),
	}

Steven Allen's avatar
Steven Allen committed
48
	go s.transport()
Karthik Bala's avatar
Karthik Bala committed
49
50
51
52
53
54
55
56
57
	return s
}

//  How to handle errors with writes?
func (s *stream) Write(p []byte) (n int, err error) {
	l := s.conn.link
	delay := l.GetLatency() + l.RateLimit(len(p))
	t := time.Now().Add(delay)
	select {
Steven Allen's avatar
Steven Allen committed
58
	case <-s.closed: // bail out if we're closing.
59
		return 0, s.state
Karthik Bala's avatar
Karthik Bala committed
60
61
62
	case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
	}
	return len(p), nil
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
64
}

65
func (s *stream) Protocol() protocol.ID {
66
67
68
	return s.protocol
}

69
func (s *stream) SetProtocol(proto protocol.ID) {
70
71
72
	s.protocol = proto
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
func (s *stream) Close() error {
Steven Allen's avatar
Steven Allen committed
74
	select {
75
76
	case s.close <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
77
78
	}
	<-s.closed
79
80
	if s.state != ErrClosed {
		return s.state
Steven Allen's avatar
Steven Allen committed
81
	}
82
	return nil
Karthik Bala's avatar
Karthik Bala committed
83
84
}

Steven Allen's avatar
Steven Allen committed
85
func (s *stream) Reset() error {
86
	// Cancel any pending writes.
Steven Allen's avatar
Steven Allen committed
87
	s.write.Close()
Karthik Bala's avatar
Karthik Bala committed
88

Steven Allen's avatar
Steven Allen committed
89
	select {
90
91
	case s.reset <- struct{}{}:
	default:
Steven Allen's avatar
Steven Allen committed
92
93
	}
	<-s.closed
94
95
	if s.state != ErrReset {
		return s.state
Steven Allen's avatar
Steven Allen committed
96
	}
97
	return nil
Steven Allen's avatar
Steven Allen committed
98
99
100
101
102
103
}

func (s *stream) teardown() {
	s.write.Close()

	// at this point, no streams are writing.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
104
	s.conn.removeStream(s)
Steven Allen's avatar
Steven Allen committed
105
106
107
108

	// Mark as closed.
	close(s.closed)

109
110
111
	s.conn.net.notifyAll(func(n inet.Notifiee) {
		n.ClosedStream(s.conn.net, s)
	})
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
113
114
115
116
}

func (s *stream) Conn() inet.Conn {
	return s.conn
}
Karthik Bala's avatar
Karthik Bala committed
117

118
func (s *stream) SetDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
119
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
120
121
}

122
func (s *stream) SetReadDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
123
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
124
125
}

126
func (s *stream) SetWriteDeadline(t time.Time) error {
Steven Allen's avatar
Steven Allen committed
127
	return &net.OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
128
129
130
}

func (s *stream) Read(b []byte) (int, error) {
Steven Allen's avatar
Steven Allen committed
131
	return s.read.Read(b)
132
133
}

Karthik Bala's avatar
Karthik Bala committed
134
135
// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
Steven Allen's avatar
Steven Allen committed
136
137
138
func (s *stream) transport() {
	defer s.teardown()

Karthik Bala's avatar
Karthik Bala committed
139
140
	bufsize := 256
	buf := new(bytes.Buffer)
Steven Allen's avatar
Steven Allen committed
141
142
143
144
145
146
147
148
149
150
	timer := time.NewTimer(0)
	if !timer.Stop() {
		select {
		case <-timer.C:
		default:
		}
	}

	// cleanup
	defer timer.Stop()
Karthik Bala's avatar
Karthik Bala committed
151
152
153
154
155

	// writeBuf writes the contents of buf through to the s.Writer.
	// done only when arrival time makes sense.
	drainBuf := func() {
		if buf.Len() > 0 {
Steven Allen's avatar
Steven Allen committed
156
			_, err := s.write.Write(buf.Bytes())
Karthik Bala's avatar
Karthik Bala committed
157
158
159
160
161
162
163
164
165
166
167
168
169
			if err != nil {
				return
			}
			buf.Reset()
		}
	}

	// deliverOrWait is a helper func that processes
	// an incoming packet. it waits until the arrival time,
	// and then writes things out.
	deliverOrWait := func(o *transportObject) {
		buffered := len(o.msg) + buf.Len()

Steven Allen's avatar
Steven Allen committed
170
171
172
173
174
175
176
177
178
		// Yes, we can end up extending a timer multiple times if we
		// keep on making small writes but that shouldn't be too much of an
		// issue. Fixing that would be painful.
		if !timer.Stop() {
			// FIXME: So, we *shouldn't* need to do this but we hang
			// here if we don't... Go bug?
			select {
			case <-timer.C:
			default:
Karthik Bala's avatar
Karthik Bala committed
179
			}
Steven Allen's avatar
Steven Allen committed
180
181
182
183
184
185
		}
		delay := o.arrivalTime.Sub(time.Now())
		if delay >= 0 {
			timer.Reset(delay)
		} else {
			timer.Reset(0)
Karthik Bala's avatar
Karthik Bala committed
186
187
		}

Steven Allen's avatar
Steven Allen committed
188
189
190
		if buffered >= bufsize {
			select {
			case <-timer.C:
191
192
			case <-s.reset:
				s.reset <- struct{}{}
Steven Allen's avatar
Steven Allen committed
193
194
195
196
197
198
199
200
201
202
				return
			}
			drainBuf()
			// write this message.
			_, err := s.write.Write(o.msg)
			if err != nil {
				log.Error("mock_stream", err)
			}
		} else {
			buf.Write(o.msg)
Karthik Bala's avatar
Karthik Bala committed
203
204
205
206
		}
	}

	for {
207
208
209
210
		// Reset takes precedent.
		select {
		case <-s.reset:
			s.state = ErrReset
Steven Allen's avatar
Steven Allen committed
211
212
213
214
			s.read.CloseWithError(ErrReset)
			return
		default:
		}
Karthik Bala's avatar
Karthik Bala committed
215

Steven Allen's avatar
Steven Allen committed
216
		select {
217
218
219
220
221
222
223
224
225
		case <-s.reset:
			s.state = ErrReset
			s.read.CloseWithError(ErrReset)
			return
		case <-s.close:
			s.state = ErrClosed
			drainBuf()
			return
		case o := <-s.toDeliver:
Karthik Bala's avatar
Karthik Bala committed
226
			deliverOrWait(o)
Steven Allen's avatar
Steven Allen committed
227
		case <-timer.C: // ok, due to write it out.
Karthik Bala's avatar
Karthik Bala committed
228
229
230
231
			drainBuf()
		}
	}
}