conn.go 2.59 KB
Newer Older
1
2
3
4
5
package libp2pquic

import (
	"fmt"
	"net"
6
	"sync"
7
8
9
10
11
12
13
14
15

	smux "github.com/jbenet/go-stream-muxer"
	tpt "github.com/libp2p/go-libp2p-transport"
	quic "github.com/lucas-clemente/quic-go"
	ma "github.com/multiformats/go-multiaddr"
	manet "github.com/multiformats/go-multiaddr-net"
)

type quicConn struct {
16
17
	mutex sync.RWMutex

18
19
20
21
22
	sess      quic.Session
	transport tpt.Transport

	laddr ma.Multiaddr
	raddr ma.Multiaddr
23
24

	closed bool
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
}

var _ tpt.Conn = &quicConn{}
var _ tpt.MultiStreamConn = &quicConn{}

func newQuicConn(sess quic.Session, t tpt.Transport) (*quicConn, error) {
	// analogues to manet.WrapNetConn
	laddr, err := quicMultiAddress(sess.LocalAddr())
	if err != nil {
		return nil, fmt.Errorf("failed to convert nconn.LocalAddr: %s", err)
	}

	// analogues to manet.WrapNetConn
	raddr, err := quicMultiAddress(sess.RemoteAddr())
	if err != nil {
		return nil, fmt.Errorf("failed to convert nconn.RemoteAddr: %s", err)
	}

43
	c := &quicConn{
44
45
46
47
		sess:      sess,
		laddr:     laddr,
		raddr:     raddr,
		transport: t,
48
49
50
51
52
	}

	go c.watchClosed()

	return c, nil
53
54
55
56
57
58
59
60
61
62
}

func (c *quicConn) AcceptStream() (smux.Stream, error) {
	str, err := c.sess.AcceptStream()
	if err != nil {
		return nil, err
	}
	return &quicStream{Stream: str}, nil
}

63
64
// OpenStream opens a new stream
// It blocks until a new stream can be opened (when limited by the QUIC maximum stream limit)
65
func (c *quicConn) OpenStream() (smux.Stream, error) {
66
	str, err := c.sess.OpenStreamSync()
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
	if err != nil {
		return nil, err
	}
	return &quicStream{Stream: str}, nil
}

func (c *quicConn) Serve(handler smux.StreamHandler) {
	for { // accept loop
		s, err := c.AcceptStream()
		if err != nil {
			return // err always means closed.
		}
		go handler(s)
	}
}

func (c *quicConn) Close() error {
	return c.sess.Close(nil)
}

87
88
89
90
91
92
93
func (c *quicConn) watchClosed() {
	c.sess.WaitUntilClosed()
	c.mutex.Lock()
	c.closed = true
	c.mutex.Unlock()
}

94
func (c *quicConn) IsClosed() bool {
95
96
97
	c.mutex.Lock()
	defer c.mutex.Unlock()
	return c.closed
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
}

func (c *quicConn) LocalAddr() net.Addr {
	return c.sess.LocalAddr()
}

func (c *quicConn) LocalMultiaddr() ma.Multiaddr {
	return c.laddr
}

func (c *quicConn) RemoteAddr() net.Addr {
	return c.sess.RemoteAddr()
}

func (c *quicConn) RemoteMultiaddr() ma.Multiaddr {
	return c.raddr
}

func (c *quicConn) Transport() tpt.Transport {
	return c.transport
}

// TODO: there must be a better way to do this
func quicMultiAddress(na net.Addr) (ma.Multiaddr, error) {
	udpMA, err := manet.FromNetAddr(na)
	if err != nil {
		return nil, err
	}
	quicMA, err := ma.NewMultiaddr(udpMA.String() + "/quic")
	if err != nil {
		return nil, err
	}
	return quicMA, nil
}