conn.go 2.43 KB
Newer Older
1
2
3
4
5
package libp2pquic

import (
	"fmt"
	"net"
6
	"sync"
7
8

	tpt "github.com/libp2p/go-libp2p-transport"
9
	smux "github.com/libp2p/go-stream-muxer"
10
11
12
13
14
15
	quic "github.com/lucas-clemente/quic-go"
	ma "github.com/multiformats/go-multiaddr"
	manet "github.com/multiformats/go-multiaddr-net"
)

type quicConn struct {
16
17
	mutex sync.RWMutex

18
19
20
21
22
	sess      quic.Session
	transport tpt.Transport

	laddr ma.Multiaddr
	raddr ma.Multiaddr
23
24

	closed bool
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
}

var _ tpt.Conn = &quicConn{}
var _ tpt.MultiStreamConn = &quicConn{}

func newQuicConn(sess quic.Session, t tpt.Transport) (*quicConn, error) {
	// analogues to manet.WrapNetConn
	laddr, err := quicMultiAddress(sess.LocalAddr())
	if err != nil {
		return nil, fmt.Errorf("failed to convert nconn.LocalAddr: %s", err)
	}

	// analogues to manet.WrapNetConn
	raddr, err := quicMultiAddress(sess.RemoteAddr())
	if err != nil {
		return nil, fmt.Errorf("failed to convert nconn.RemoteAddr: %s", err)
	}

43
	c := &quicConn{
44
45
46
47
		sess:      sess,
		laddr:     laddr,
		raddr:     raddr,
		transport: t,
48
49
50
51
	}
	go c.watchClosed()

	return c, nil
52
53
54
}

func (c *quicConn) AcceptStream() (smux.Stream, error) {
55
	return c.sess.AcceptStream()
56
57
}

58
59
// OpenStream opens a new stream
// It blocks until a new stream can be opened (when limited by the QUIC maximum stream limit)
60
func (c *quicConn) OpenStream() (smux.Stream, error) {
61
	return c.sess.OpenStreamSync()
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
}

func (c *quicConn) Serve(handler smux.StreamHandler) {
	for { // accept loop
		s, err := c.AcceptStream()
		if err != nil {
			return // err always means closed.
		}
		go handler(s)
	}
}

func (c *quicConn) Close() error {
	return c.sess.Close(nil)
}

78
79
80
81
82
83
84
func (c *quicConn) watchClosed() {
	c.sess.WaitUntilClosed()
	c.mutex.Lock()
	c.closed = true
	c.mutex.Unlock()
}

85
func (c *quicConn) IsClosed() bool {
86
87
88
	c.mutex.Lock()
	defer c.mutex.Unlock()
	return c.closed
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
}

func (c *quicConn) LocalAddr() net.Addr {
	return c.sess.LocalAddr()
}

func (c *quicConn) LocalMultiaddr() ma.Multiaddr {
	return c.laddr
}

func (c *quicConn) RemoteAddr() net.Addr {
	return c.sess.RemoteAddr()
}

func (c *quicConn) RemoteMultiaddr() ma.Multiaddr {
	return c.raddr
}

func (c *quicConn) Transport() tpt.Transport {
	return c.transport
}

// TODO: there must be a better way to do this
func quicMultiAddress(na net.Addr) (ma.Multiaddr, error) {
	udpMA, err := manet.FromNetAddr(na)
	if err != nil {
		return nil, err
	}
	quicMA, err := ma.NewMultiaddr(udpMA.String() + "/quic")
	if err != nil {
		return nil, err
	}
	return quicMA, nil
}