inbound.go 2.64 KB
Newer Older
Jeromy's avatar
Jeromy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package buffer

import (
	"errors"
	"io"
	"io/ioutil"
	"sync"
	"time"
)

var (
	AlreadyClosed = errors.New("Buffer already closed")
)

// A specialized concurrent circular buffer intended to buffer a stream's inbound data with the following properties:
// - Minimizes copies by skipping the buffer if a write occurs while a reader is waiting
// - Provides a mechnaism to time out reads after a deadline
// - Provides a mechanism to set an 'error' that will fail reads when the buffer is empty
type waitingReader struct {
	buf []byte
	n   int
}

type Inbound struct {
	*Circular
	*sync.Cond
	err error
	waitingReader
	deadline time.Time
	timer    *time.Timer
}

func NewInbound(size int) *Inbound {
	return &Inbound{
		Circular: NewCircular(size),
		Cond:     sync.NewCond(new(sync.Mutex)),
	}
}

func (b *Inbound) SetDeadline(t time.Time) {
	b.L.Lock()

	// set the deadline
	b.deadline = t

	// how long until the deadline
	delay := t.Sub(time.Now())

	if b.timer != nil {
		b.timer.Stop()
	}

	// after the delay, wake up waiters
	b.timer = time.AfterFunc(delay, func() {
		b.Broadcast()
	})

	b.L.Unlock()
}

func (b *Inbound) SetError(err error) {
	b.L.Lock()
	b.err = err
	b.Broadcast()
	b.L.Unlock()
}

func (b *Inbound) GetError() (err error) {
	b.L.Lock()
	err = b.err
	b.L.Unlock()
	return
}

func (b *Inbound) ReadFrom(rd io.Reader) (n int, err error) {
	b.L.Lock()

	if b.err != nil {
		b.L.Unlock()
		if _, err = ioutil.ReadAll(rd); err != nil {
			return
		}
		return 0, AlreadyClosed
	}

	// write directly to a reader's buffer, if possible
	if b.waitingReader.buf != nil {
		b.waitingReader.n, err = readInto(rd, b.waitingReader.buf)
		n += b.waitingReader.n
		b.waitingReader.buf = nil
		if err != nil {
			if err == io.EOF {
				// EOF is not an error
				err = nil
			}

			b.Broadcast()
			b.L.Unlock()
			return
		}
	}

	// write the rest to buffer
	var writeN int
	writeN, err = b.Circular.ReadFrom(rd)
	n += writeN

	b.Broadcast()
	b.L.Unlock()
	return
}

func (b *Inbound) Read(p []byte) (n int, err error) {
	b.L.Lock()

	var wait *waitingReader

	for {
		// we got a direct write to our buffer
		if wait != nil && wait.n != 0 {
			n = wait.n
			break
		}

		// check for timeout
		if !b.deadline.IsZero() {
			if time.Now().After(b.deadline) {
				err = errors.New("Read timeout")
				break
			}
		}

		// try to read from the buffer
		n, _ = b.Circular.Read(p)

		// successfully read some data
		if n != 0 {
			break
		}

		// there's an error
		if b.err != nil {
			err = b.err
			break
		}

		// register for a direct write
		if b.waitingReader.buf == nil {
			wait = &b.waitingReader
			wait.buf = p
			wait.n = 0
		}

		// no data, wait
		b.Wait()
	}

	b.L.Unlock()
	return
}