Unverified Commit d048f057 authored by Marten Seemann's avatar Marten Seemann
Browse files

add support for multi-stream connections

parent 24731153
package libp2pquic
import (
"fmt"
"net"
smux "github.com/jbenet/go-stream-muxer"
tpt "github.com/libp2p/go-libp2p-transport"
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
type quicConn struct {
sess quic.Session
transport tpt.Transport
laddr ma.Multiaddr
raddr ma.Multiaddr
}
var _ tpt.Conn = &quicConn{}
var _ tpt.MultiStreamConn = &quicConn{}
func newQuicConn(sess quic.Session, t tpt.Transport) (*quicConn, error) {
// analogues to manet.WrapNetConn
laddr, err := quicMultiAddress(sess.LocalAddr())
if err != nil {
return nil, fmt.Errorf("failed to convert nconn.LocalAddr: %s", err)
}
// analogues to manet.WrapNetConn
raddr, err := quicMultiAddress(sess.RemoteAddr())
if err != nil {
return nil, fmt.Errorf("failed to convert nconn.RemoteAddr: %s", err)
}
return &quicConn{
sess: sess,
laddr: laddr,
raddr: raddr,
transport: t,
}, nil
}
func (c *quicConn) AcceptStream() (smux.Stream, error) {
str, err := c.sess.AcceptStream()
if err != nil {
return nil, err
}
return &quicStream{Stream: str}, nil
}
func (c *quicConn) OpenStream() (smux.Stream, error) {
str, err := c.sess.OpenStream()
if err != nil {
return nil, err
}
return &quicStream{Stream: str}, nil
}
func (c *quicConn) Serve(handler smux.StreamHandler) {
for { // accept loop
s, err := c.AcceptStream()
if err != nil {
return // err always means closed.
}
go handler(s)
}
}
func (c *quicConn) Close() error {
return c.sess.Close(nil)
}
// TODO: implement this
func (c *quicConn) IsClosed() bool {
return false
}
func (c *quicConn) LocalAddr() net.Addr {
return c.sess.LocalAddr()
}
func (c *quicConn) LocalMultiaddr() ma.Multiaddr {
return c.laddr
}
func (c *quicConn) RemoteAddr() net.Addr {
return c.sess.RemoteAddr()
}
func (c *quicConn) RemoteMultiaddr() ma.Multiaddr {
return c.raddr
}
func (c *quicConn) Transport() tpt.Transport {
return c.transport
}
// TODO: there must be a better way to do this
func quicMultiAddress(na net.Addr) (ma.Multiaddr, error) {
udpMA, err := manet.FromNetAddr(na)
if err != nil {
return nil, err
}
quicMA, err := ma.NewMultiaddr(udpMA.String() + "/quic")
if err != nil {
return nil, err
}
return quicMA, nil
}
package libp2pquic
import (
"errors"
"net"
smux "github.com/jbenet/go-stream-muxer"
quic "github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/protocol"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
type mockStream struct {
id protocol.StreamID
}
func (s *mockStream) Close() error { return nil }
func (s *mockStream) Reset(error) { return }
func (s *mockStream) Read([]byte) (int, error) { return 0, nil }
func (s *mockStream) Write([]byte) (int, error) { return 0, nil }
func (s *mockStream) StreamID() protocol.StreamID { return s.id }
var _ quic.Stream = &mockStream{}
type mockQuicSession struct {
closed bool
localAddr net.Addr
remoteAddr net.Addr
streamToAccept quic.Stream
streamAcceptErr error
streamToOpen quic.Stream
streamOpenErr error
}
var _ quic.Session = &mockQuicSession{}
func (s *mockQuicSession) AcceptStream() (quic.Stream, error) {
return s.streamToAccept, s.streamAcceptErr
}
func (s *mockQuicSession) OpenStream() (quic.Stream, error) { return s.streamToOpen, s.streamOpenErr }
func (s *mockQuicSession) OpenStreamSync() (quic.Stream, error) { return s.streamToOpen, nil }
func (s *mockQuicSession) Close(error) error { s.closed = true; return nil }
func (s *mockQuicSession) LocalAddr() net.Addr { return s.localAddr }
func (s *mockQuicSession) RemoteAddr() net.Addr { return s.remoteAddr }
var _ = Describe("Conn", func() {
var (
conn *quicConn
sess *mockQuicSession
)
BeforeEach(func() {
sess = &mockQuicSession{
localAddr: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1337},
remoteAddr: &net.UDPAddr{IP: net.IPv4(192, 168, 13, 37), Port: 1234},
}
var err error
conn, err = newQuicConn(sess, nil)
Expect(err).ToNot(HaveOccurred())
})
It("has the correct local address", func() {
Expect(conn.LocalAddr()).To(Equal(sess.localAddr))
Expect(conn.LocalMultiaddr().String()).To(Equal("/ip4/127.0.0.1/udp/1337/quic"))
})
It("has the correct remote address", func() {
Expect(conn.RemoteAddr()).To(Equal(sess.remoteAddr))
Expect(conn.RemoteMultiaddr().String()).To(Equal("/ip4/192.168.13.37/udp/1234/quic"))
})
It("closes", func() {
err := conn.Close()
Expect(err).ToNot(HaveOccurred())
Expect(sess.closed).To(BeTrue())
})
Context("opening streams", func() {
It("opens streams", func() {
s := &mockStream{id: 1337}
sess.streamToOpen = s
str, err := conn.OpenStream()
Expect(err).ToNot(HaveOccurred())
Expect(str.(*quicStream).Stream).To(Equal(s))
})
It("errors when it can't open a stream", func() {
testErr := errors.New("stream open err")
sess.streamOpenErr = testErr
_, err := conn.OpenStream()
Expect(err).To(MatchError(testErr))
})
})
Context("accepting streams", func() {
It("accepts streams", func() {
s := &mockStream{id: 1337}
sess.streamToAccept = s
str, err := conn.AcceptStream()
Expect(err).ToNot(HaveOccurred())
Expect(str.(*quicStream).Stream).To(Equal(s))
})
It("errors when it can't open a stream", func() {
testErr := errors.New("stream open err")
sess.streamAcceptErr = testErr
_, err := conn.AcceptStream()
Expect(err).To(MatchError(testErr))
})
})
Context("serving", func() {
var (
handler func(smux.Stream)
handlerCalled bool
handlerCalledWith smux.Stream
)
BeforeEach(func() {
handlerCalled = false
handlerCalledWith = nil
handler = func(s smux.Stream) {
handlerCalledWith = s
handlerCalled = true
}
})
It("calls the handler", func() {
str := &mockStream{id: 5}
sess.streamToAccept = str
var returned bool
go func() {
conn.Serve(handler)
returned = true
}()
Eventually(func() bool { return handlerCalled }).Should(BeTrue())
Expect(handlerCalledWith.(*quicStream).Stream).To(Equal(str))
// make the go-routine return
sess.streamAcceptErr = errors.New("stop test")
})
It("returns when accepting a stream errors", func() {
sess.streamAcceptErr = errors.New("accept err")
var returned bool
go func() {
conn.Serve(handler)
returned = true
}()
Eventually(func() bool { return returned }).Should(BeTrue())
Expect(handlerCalled).To(BeFalse())
})
})
})
......@@ -5,7 +5,7 @@ import (
"crypto/tls"
tpt "github.com/libp2p/go-libp2p-transport"
quicconn "github.com/marten-seemann/quic-conn"
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/whyrusleeping/mafmt"
......@@ -15,6 +15,8 @@ type dialer struct {
transport tpt.Transport
}
var _ tpt.Dialer = &dialer{}
func newDialer(transport tpt.Transport) (*dialer, error) {
return &dialer{
transport: transport,
......@@ -28,28 +30,20 @@ func (d *dialer) Dial(raddr ma.Multiaddr) (tpt.Conn, error) {
if err != nil {
return nil, err
}
c, err := quicconn.Dial(host, tlsConf)
if err != nil {
return nil, err
}
mnc, err := manet.WrapNetConn(c)
qsess, err := quic.DialAddr(host, &quic.Config{TLSConfig: tlsConf})
if err != nil {
return nil, err
}
return &tpt.ConnWrap{
Conn: mnc,
Tpt: d.transport,
}, nil
return newQuicConn(qsess, d.transport)
}
func (d *dialer) DialContext(ctx context.Context, raddr ma.Multiaddr) (tpt.Conn, error) {
// TODO: implement the ctx
return d.Dial(raddr)
}
func (d *dialer) Matches(a ma.Multiaddr) bool {
return mafmt.QUIC.Matches(a)
}
var _ tpt.Dialer = &dialer{}
......@@ -4,53 +4,50 @@ import (
"net"
tpt "github.com/libp2p/go-libp2p-transport"
quic "github.com/lucas-clemente/quic-go"
testdata "github.com/lucas-clemente/quic-go/testdata"
quicconn "github.com/marten-seemann/quic-conn"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
type listener struct {
laddr ma.Multiaddr
quicListener net.Listener
quicListener quic.Listener
transport tpt.Transport
}
func newListener(laddr ma.Multiaddr, transport tpt.Transport) (*listener, error) {
// we need to provide a certificate here
// use the demo certificate from quic-go
tlsConf := testdata.GetTLSConfig()
network, host, err := manet.DialArgs(laddr)
var _ tpt.Listener = &listener{}
func newListener(laddr ma.Multiaddr, t tpt.Transport) (*listener, error) {
qconf := &quic.Config{
// we need to provide a certificate here
// use the demo certificate from quic-go
TLSConfig: testdata.GetTLSConfig(),
}
_, host, err := manet.DialArgs(laddr)
if err != nil {
return nil, err
}
qln, err := quicconn.Listen(network, host, tlsConf)
qln, err := quic.ListenAddr(host, qconf)
if err != nil {
return nil, err
}
return &listener{
laddr: laddr,
quicListener: qln,
transport: transport,
transport: t,
}, nil
}
func (l *listener) Accept() (tpt.Conn, error) {
c, err := l.quicListener.Accept()
sess, err := l.quicListener.Accept()
if err != nil {
return nil, err
}
mnc, err := manet.WrapNetConn(c)
if err != nil {
return nil, err
}
return &tpt.ConnWrap{
Conn: mnc,
Tpt: l.transport,
}, nil
return newQuicConn(sess, l.transport)
}
func (l *listener) Close() error {
......@@ -64,5 +61,3 @@ func (l *listener) Addr() net.Addr {
func (l *listener) Multiaddr() ma.Multiaddr {
return l.laddr
}
var _ tpt.Listener = &listener{}
......@@ -5,48 +5,42 @@ import (
"net"
tpt "github.com/libp2p/go-libp2p-transport"
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
type mockNetListener struct {
type mockQuicListener struct {
connToAccept net.Conn
acceptErr error
serveErr error
closed bool
}
func (m *mockNetListener) Accept() (net.Conn, error) {
if m.acceptErr != nil {
return nil, m.acceptErr
}
return m.connToAccept, nil
sessionToAccept quic.Session
acceptErr error
}
func (m *mockNetListener) Close() error {
m.closed = true
return nil
}
var _ quic.Listener = &mockQuicListener{}
func (m *mockQuicListener) Close() error { m.closed = true; return nil }
func (m *mockQuicListener) Accept() (quic.Session, error) { return m.sessionToAccept, m.acceptErr }
func (m *mockQuicListener) Addr() net.Addr { panic("not implemented") }
func (m *mockNetListener) Addr() net.Addr {
panic("not implemented")
}
var _ net.Listener = &mockNetListener{}
var _ = Describe("Listener", func() {
var (
l *listener
netListener *mockNetListener
transport tpt.Transport
l *listener
quicListener *mockQuicListener
transport tpt.Transport
)
BeforeEach(func() {
netListener = &mockNetListener{}
quicListener = &mockQuicListener{}
transport = &QuicTransport{}
l = &listener{
quicListener: netListener,
quicListener: quicListener,
transport: transport,
}
})
......@@ -70,34 +64,13 @@ var _ = Describe("Listener", func() {
It("closes", func() {
err := l.Close()
Expect(err).ToNot(HaveOccurred())
Expect(netListener.closed).To(BeTrue())
Expect(quicListener.closed).To(BeTrue())
})
Context("accepting", func() {
It("accepts a new conn", func() {
remoteAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:1234")
Expect(err).ToNot(HaveOccurred())
localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:4321")
Expect(err).ToNot(HaveOccurred())
udpConn, err := net.DialUDP("udp", localAddr, remoteAddr)
netListener.connToAccept = udpConn
conn, err := l.Accept()
Expect(err).ToNot(HaveOccurred())
Expect(conn.LocalMultiaddr().String()).To(Equal("/ip4/127.0.0.1/udp/4321"))
Expect(conn.RemoteMultiaddr().String()).To(Equal("/ip4/127.0.0.1/udp/1234"))
Expect(conn.Transport()).To(Equal(transport))
})
It("errors if it can't read the muliaddresses of a conn", func() {
netListener.connToAccept = &net.UDPConn{}
_, err := l.Accept()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("nil multiaddr"))
})
It("errors if no connection can be accepted", func() {
testErr := errors.New("test error")
netListener.acceptErr = testErr
quicListener.acceptErr = testErr
_, err := l.Accept()
Expect(err).To(MatchError(testErr))
})
......
package libp2pquic
import (
"time"
smux "github.com/jbenet/go-stream-muxer"
quic "github.com/lucas-clemente/quic-go"
)
// The quicStream is a very thin wrapper for a quic.Stream, adding some methods
// required to fulfill the smux.Stream interface
// TODO: this can be removed once the quic.Stream supports deadlines (quic-go#514)
type quicStream struct {
quic.Stream
}
var _ smux.Stream = &quicStream{}
func (s *quicStream) SetDeadline(time.Time) error {
return nil
}
func (s *quicStream) SetReadDeadline(time.Time) error {
return nil
}
func (s *quicStream) SetWriteDeadline(time.Time) error {
return nil
}
......@@ -18,6 +18,8 @@ type QuicTransport struct {
dialers map[string]tpt.Dialer
}
var _ tpt.Transport = &QuicTransport{}
// NewQuicTransport creates a new QUIC Transport
// it tracks dialers and listeners created
func NewQuicTransport() *QuicTransport {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment