lazy.go 1.85 KB
Newer Older
Jeromy's avatar
Jeromy committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package multistream

import (
	"fmt"
	"io"
	"sync"
)

type Multistream interface {
	io.ReadWriteCloser
	Protocol() string
}

func NewMSSelect(c io.ReadWriteCloser, proto string) Multistream {
	return NewMultistream(NewMultistream(c, ProtocolID), proto)
}

func NewMultistream(c io.ReadWriteCloser, proto string) Multistream {
	return &lazyConn{
		proto: proto,
		con:   c,
	}
}

type lazyConn struct {
	rhandshake bool // only accessed by 'Read' should not call read async

	rhlock sync.Mutex
	rhsync bool //protected by mutex
	rerr   error

	whandshake bool

	whlock sync.Mutex
	whsync bool
	werr   error

	proto string
	con   io.ReadWriteCloser
}

func (l *lazyConn) Protocol() string {
	return l.proto
}

func (l *lazyConn) Read(b []byte) (int, error) {
	if !l.rhandshake {
		go l.writeHandshake()
		err := l.readHandshake()
		if err != nil {
			return 0, err
		}

		l.rhandshake = true
	}

	if len(b) == 0 {
		return 0, nil
	}

	return l.con.Read(b)
}

func (l *lazyConn) readHandshake() error {
	l.rhlock.Lock()
	defer l.rhlock.Unlock()

	// if we've already done this, exit
	if l.rhsync {
		return l.rerr
	}
	l.rhsync = true

	// read protocol
	tok, err := ReadNextToken(l.con)
	if err != nil {
		l.rerr = err
		return err
	}

	if tok != l.proto {
		l.rerr = fmt.Errorf("protocol mismatch in lazy handshake ( %s != %s )", tok, l.proto)
		return l.rerr
	}

	return nil
}

func (l *lazyConn) writeHandshake() error {
	l.whlock.Lock()
	defer l.whlock.Unlock()

	if l.whsync {
		return l.werr
	}

	l.whsync = true

	err := delimWrite(l.con, []byte(l.proto))
	if err != nil {
		l.werr = err
		return err
	}

	return nil
}

func (l *lazyConn) Write(b []byte) (int, error) {
	if !l.whandshake {
		go l.readHandshake()
		err := l.writeHandshake()
		if err != nil {
			return 0, err
		}

		l.whandshake = true
	}

	return l.con.Write(b)
}

func (l *lazyConn) Close() error {
	return l.con.Close()
}