heartbeat.go 2.43 KB
Newer Older
Jeromy's avatar
Jeromy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package ext

// XXX: There's no logging around heartbeats - how can we do this in a way that is useful
// as a library?
//
// XXX: When we close the session because of a lost heartbeat or because of an error in the
// heartbeating, there is no way to tell that; a Session will just appear to stop working

import (
	proto "QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto"
	"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto/frame"
	"encoding/binary"
	"io"
	"time"
)

const (
	defaultHeartbeatInterval  = 3 * time.Second
	defaultHeartbeatTolerance = 10 * time.Second
)

type Heartbeat struct {
	sess   proto.ISession
	accept proto.ExtAccept

	mark      chan int
	interval  time.Duration
	tolerance time.Duration
}

func NewDefaultHeartbeat() *Heartbeat {
	return NewHeartbeat(defaultHeartbeatInterval, defaultHeartbeatTolerance)
}

func NewHeartbeat(interval, tolerance time.Duration) *Heartbeat {
	return &Heartbeat{
		mark:      make(chan int),
		interval:  interval,
		tolerance: tolerance,
	}
}

func (h *Heartbeat) Start(sess proto.ISession, accept proto.ExtAccept) frame.StreamType {
	h.sess = sess
	h.accept = accept
	go h.respond()
	go h.request()
	go h.check()

	return heartbeatExtensionType
}

func (h *Heartbeat) check() {
	t := time.NewTimer(h.interval + h.tolerance)

	for {
		select {
		case <-t.C:
			// time out waiting for a response!
			h.sess.Close()
			return

		case <-h.mark:
			t.Reset(h.interval + h.tolerance)
		}
	}
}

func (h *Heartbeat) respond() {
	// close the session on any errors
	defer h.sess.Close()

	stream, err := h.accept()
	if err != nil {
		return
	}

	// read the next heartbeat id and respond
	buf := make([]byte, 4)
	for {
		_, err := io.ReadFull(stream, buf)
		if err != nil {
			return
		}

		_, err = stream.Write(buf)
		if err != nil {
			return
		}
	}
}

func (h *Heartbeat) request() {
	// close the session on any errors
	defer h.sess.Close()

	// request highest possible priority for heartbeats
	priority := frame.StreamPriority(0x7FFFFFFF)
	stream, err := h.sess.OpenStream(priority, heartbeatExtensionType, false)
	if err != nil {
		return
	}

	// send heartbeats and then check that we got them back
	var id uint32
	for {
		time.Sleep(h.interval)

		if err := binary.Write(stream, binary.BigEndian, id); err != nil {
			return
		}

		var respId uint32
		if err := binary.Read(stream, binary.BigEndian, &respId); err != nil {
			return
		}

		if id != respId {
			return
		}

		// record the time
		h.mark <- 1
	}
}