writer.go 4.42 KB
Newer Older
Jeromy's avatar
Jeromy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package log

import (
	"fmt"
	"io"
	"sync"
)

var MaxWriterBuffer = 512 * 1024

var log = Logger("eventlog")

type MirrorWriter struct {
	active   bool
	activelk sync.Mutex

	// channel for incoming writers
	writerAdd chan *writerAdd

	// slices of writer/sync-channel pairs
	writers []*bufWriter

	// synchronization channel for incoming writes
	msgSync chan []byte
}

type writerSync struct {
	w  io.WriteCloser
	br chan []byte
}

func NewMirrorWriter() *MirrorWriter {
	mw := &MirrorWriter{
		msgSync:   make(chan []byte, 64), // sufficiently large buffer to avoid callers waiting
		writerAdd: make(chan *writerAdd),
	}

	go mw.logRoutine()

	return mw
}

func (mw *MirrorWriter) Write(b []byte) (int, error) {
	mycopy := make([]byte, len(b))
	copy(mycopy, b)
	mw.msgSync <- mycopy
	return len(b), nil
}

func (mw *MirrorWriter) Close() error {
	// it is up to the caller to ensure that write is not called during or
	// after close is called.
	close(mw.msgSync)
	return nil
}

func (mw *MirrorWriter) doClose() {
	for _, w := range mw.writers {
		w.writer.Close()
	}
}

func (mw *MirrorWriter) logRoutine() {
	// rebind to avoid races on nilling out struct fields
	msgSync := mw.msgSync
	writerAdd := mw.writerAdd

	defer mw.doClose()

	for {
		select {
		case b, ok := <-msgSync:
			if !ok {
				return
			}

			// write to all writers
			dropped := mw.broadcastMessage(b)

			// consolidate the slice
			if dropped {
				mw.clearDeadWriters()
			}
		case wa := <-writerAdd:
			mw.writers = append(mw.writers, newBufWriter(wa.w))

			mw.activelk.Lock()
			mw.active = true
			mw.activelk.Unlock()
			close(wa.done)
		}
	}
}

// broadcastMessage sends the given message to every writer
// if any writer is killed during the send, 'true' is returned
func (mw *MirrorWriter) broadcastMessage(b []byte) bool {
	var dropped bool
	for i, w := range mw.writers {
		_, err := w.Write(b)
		if err != nil {
			mw.writers[i] = nil
			dropped = true
		}
	}
	return dropped
}

func (mw *MirrorWriter) clearDeadWriters() {
	writers := mw.writers
	mw.writers = nil
	for _, w := range writers {
		if w != nil {
			mw.writers = append(mw.writers, w)
		}
	}
	if len(mw.writers) == 0 {
		mw.activelk.Lock()
		mw.active = false
		mw.activelk.Unlock()
	}
}

type writerAdd struct {
	w    io.WriteCloser
	done chan struct{}
}

func (mw *MirrorWriter) AddWriter(w io.WriteCloser) {
	wa := &writerAdd{
		w:    w,
		done: make(chan struct{}),
	}
	mw.writerAdd <- wa
	<-wa.done
}

func (mw *MirrorWriter) Active() (active bool) {
	mw.activelk.Lock()
	active = mw.active
	mw.activelk.Unlock()
	return
}

func newBufWriter(w io.WriteCloser) *bufWriter {
	bw := &bufWriter{
		writer:   w,
		incoming: make(chan []byte, 1),
	}

	go bw.loop()
	return bw
}

type bufWriter struct {
	writer io.WriteCloser

	incoming chan []byte

	deathLock sync.Mutex
	dead      bool
}

var errDeadWriter = fmt.Errorf("writer is dead")

func (bw *bufWriter) Write(b []byte) (int, error) {
	bw.deathLock.Lock()
	dead := bw.dead
	bw.deathLock.Unlock()
	if dead {
		if bw.incoming != nil {
			close(bw.incoming)
			bw.incoming = nil
		}
		return 0, errDeadWriter
	}

	bw.incoming <- b
	return len(b), nil
}

func (bw *bufWriter) die() {
	bw.deathLock.Lock()
	bw.dead = true
	bw.writer.Close()
	bw.deathLock.Unlock()
}

func (bw *bufWriter) loop() {
	bufsize := 0
	bufBase := make([][]byte, 0, 16) // some initial memory
	buffered := bufBase
	nextCh := make(chan []byte)

	var nextMsg []byte

	go func() {
		for b := range nextCh {
			_, err := bw.writer.Write(b)
			if err != nil {
				log.Info("eventlog write error: %s", err)
				bw.die()
				return
			}
		}
	}()

	// collect and buffer messages
	incoming := bw.incoming
	for {
		if nextMsg == nil || nextCh == nil {
			// nextCh == nil implies we are 'dead' and draining the incoming channel
			// until the caller notices and closes it for us
			select {
			case b, ok := <-incoming:
				if !ok {
					return
				}
				nextMsg = b
			}
		}

		select {
		case b, ok := <-incoming:
			if !ok {
				return
			}
			bufsize += len(b)
			buffered = append(buffered, b)
			if bufsize > MaxWriterBuffer {
				// if we have too many messages buffered, kill the writer
				bw.die()
				close(nextCh)
				nextCh = nil
				// explicity keep going here to drain incoming
			}
		case nextCh <- nextMsg:
			nextMsg = nil
			if len(buffered) > 0 {
				nextMsg = buffered[0]
				buffered = buffered[1:]
				bufsize -= len(nextMsg)
			}

			if len(buffered) == 0 {
				// reset slice position
				buffered = bufBase[:0]
			}
		}
	}
}