Commit c8b9d442 authored by Marten Seemann's avatar Marten Seemann
Browse files

refactor for transport interface changes

parent 151e46bb
...@@ -5,3 +5,8 @@ ...@@ -5,3 +5,8 @@
[![Code Coverage](https://img.shields.io/codecov/c/github/marten-seemann/libp2p-quic-transport/master.svg?style=flat-square)](https://codecov.io/gh/marten-seemann/libp2p-quic-transport/) [![Code Coverage](https://img.shields.io/codecov/c/github/marten-seemann/libp2p-quic-transport/master.svg?style=flat-square)](https://codecov.io/gh/marten-seemann/libp2p-quic-transport/)
This is an implementation of the [libp2p transport](https://github.com/libp2p/go-libp2p-transport/blob/master/transport.go) and the [libp2p stream muxer](https://github.com/libp2p/go-stream-muxer) using QUIC. This is an implementation of the [libp2p transport](https://github.com/libp2p/go-libp2p-transport/blob/master/transport.go) and the [libp2p stream muxer](https://github.com/libp2p/go-stream-muxer) using QUIC.
## Known limitations
* currently only works with RSA host keys
* [#2](https://github.com/marten-seemann/libp2p-quic-transport/issues/2)
package libp2pquic package libp2pquic
import ( import (
"fmt"
"net" "net"
"sync"
ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport" tpt "github.com/libp2p/go-libp2p-transport"
smux "github.com/libp2p/go-stream-muxer" smux "github.com/libp2p/go-stream-muxer"
quic "github.com/lucas-clemente/quic-go" quic "github.com/lucas-clemente/quic-go"
...@@ -12,96 +12,78 @@ import ( ...@@ -12,96 +12,78 @@ import (
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
) )
type quicConn struct { type conn struct {
mutex sync.RWMutex
sess quic.Session sess quic.Session
transport tpt.Transport transport tpt.Transport
laddr ma.Multiaddr localPeer peer.ID
raddr ma.Multiaddr privKey ic.PrivKey
localMultiaddr ma.Multiaddr
closed bool remotePeerID peer.ID
remotePubKey ic.PubKey
remoteMultiaddr ma.Multiaddr
} }
var _ tpt.Conn = &quicConn{} var _ tpt.Conn = &conn{}
var _ tpt.MultiplexConn = &quicConn{}
func newQuicConn(sess quic.Session, t tpt.Transport) (*quicConn, error) {
// analogues to manet.WrapNetConn
laddr, err := quicMultiAddress(sess.LocalAddr())
if err != nil {
return nil, fmt.Errorf("failed to convert nconn.LocalAddr: %s", err)
}
// analogues to manet.WrapNetConn func (c *conn) Close() error {
raddr, err := quicMultiAddress(sess.RemoteAddr()) return c.sess.Close(nil)
if err != nil {
return nil, fmt.Errorf("failed to convert nconn.RemoteAddr: %s", err)
}
c := &quicConn{
sess: sess,
laddr: laddr,
raddr: raddr,
transport: t,
}
go c.watchClosed()
return c, nil
} }
func (c *quicConn) AcceptStream() (smux.Stream, error) { // IsClosed returns whether a connection is fully closed.
str, err := c.sess.AcceptStream() func (c *conn) IsClosed() bool {
return &stream{str}, err return c.sess.Context().Err() != nil
} }
// OpenStream opens a new stream // OpenStream creates a new stream.
// It blocks until a new stream can be opened (when limited by the QUIC maximum stream limit) func (c *conn) OpenStream() (smux.Stream, error) {
func (c *quicConn) OpenStream() (smux.Stream, error) { qstr, err := c.sess.OpenStreamSync()
str, err := c.sess.OpenStreamSync() return &stream{Stream: qstr}, err
return &stream{str}, err
} }
func (c *quicConn) Close() error { // AcceptStream accepts a stream opened by the other side.
return c.sess.Close(nil) func (c *conn) AcceptStream() (smux.Stream, error) {
qstr, err := c.sess.AcceptStream()
return &stream{Stream: qstr}, err
} }
func (c *quicConn) watchClosed() { // LocalPeer returns our peer ID
<-c.sess.Context().Done() func (c *conn) LocalPeer() peer.ID {
c.mutex.Lock() return c.localPeer
c.closed = true
c.mutex.Unlock()
} }
func (c *quicConn) IsClosed() bool { // LocalPrivateKey returns our private key
c.mutex.Lock() func (c *conn) LocalPrivateKey() ic.PrivKey {
defer c.mutex.Unlock() return c.privKey
return c.closed
} }
func (c *quicConn) LocalAddr() net.Addr { // RemotePeer returns the peer ID of the remote peer.
return c.sess.LocalAddr() func (c *conn) RemotePeer() peer.ID {
return c.remotePeerID
} }
func (c *quicConn) LocalMultiaddr() ma.Multiaddr { // RemotePublicKey returns the public key of the remote peer.
return c.laddr func (c *conn) RemotePublicKey() ic.PubKey {
return c.remotePubKey
} }
func (c *quicConn) RemoteAddr() net.Addr { // LocalMultiaddr returns the local Multiaddr associated
return c.sess.RemoteAddr() func (c *conn) LocalMultiaddr() ma.Multiaddr {
return c.localMultiaddr
} }
func (c *quicConn) RemoteMultiaddr() ma.Multiaddr { // RemoteMultiaddr returns the remote Multiaddr associated
return c.raddr func (c *conn) RemoteMultiaddr() ma.Multiaddr {
return c.remoteMultiaddr
} }
func (c *quicConn) Transport() tpt.Transport { func (c *conn) Transport() tpt.Transport {
return c.transport return c.transport
} }
// TODO: there must be a better way to do this // TODO: there must be a better way to do this
func quicMultiAddress(na net.Addr) (ma.Multiaddr, error) { func quicMultiaddr(na net.Addr) (ma.Multiaddr, error) {
udpMA, err := manet.FromNetAddr(na) udpMA, err := manet.FromNetAddr(na)
if err != nil { if err != nil {
return nil, err return nil, err
......
...@@ -2,133 +2,92 @@ package libp2pquic ...@@ -2,133 +2,92 @@ package libp2pquic
import ( import (
"context" "context"
"errors" "crypto/rand"
"net" "crypto/rsa"
"time" "crypto/x509"
quic "github.com/lucas-clemente/quic-go" ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
type mockStream struct { var _ = Describe("Connection", func() {
id quic.StreamID
}
func (s *mockStream) Close() error { return nil }
func (s *mockStream) Reset(error) { return }
func (s *mockStream) Read([]byte) (int, error) { return 0, nil }
func (s *mockStream) Write([]byte) (int, error) { return 0, nil }
func (s *mockStream) StreamID() quic.StreamID { return s.id }
func (s *mockStream) SetReadDeadline(time.Time) error { panic("not implemented") }
func (s *mockStream) SetWriteDeadline(time.Time) error { panic("not implemented") }
func (s *mockStream) SetDeadline(time.Time) error { panic("not implemented") }
func (s *mockStream) Context() context.Context { panic("not implemented") }
var _ quic.Stream = &mockStream{}
type mockQuicSession struct {
closed bool
context context.Context
localAddr net.Addr
remoteAddr net.Addr
streamToAccept quic.Stream
streamAcceptErr error
streamToOpen quic.Stream
streamOpenErr error
}
var _ quic.Session = &mockQuicSession{}
func (s *mockQuicSession) AcceptStream() (quic.Stream, error) {
return s.streamToAccept, s.streamAcceptErr
}
func (s *mockQuicSession) OpenStream() (quic.Stream, error) { return s.streamToOpen, s.streamOpenErr }
func (s *mockQuicSession) OpenStreamSync() (quic.Stream, error) {
return s.streamToOpen, s.streamOpenErr
}
func (s *mockQuicSession) Close(error) error { s.closed = true; return nil }
func (s *mockQuicSession) LocalAddr() net.Addr { return s.localAddr }
func (s *mockQuicSession) RemoteAddr() net.Addr { return s.remoteAddr }
func (s *mockQuicSession) Context() context.Context { return s.context }
var _ = Describe("Conn", func() {
var ( var (
conn *quicConn serverKey, clientKey ic.PrivKey
sess *mockQuicSession serverID, clientID peer.ID
ctxCancel context.CancelFunc
) )
BeforeEach(func() { createPeer := func() ic.PrivKey {
var ctx context.Context key, err := rsa.GenerateKey(rand.Reader, 1024)
ctx, ctxCancel = context.WithCancel(context.Background())
sess = &mockQuicSession{
localAddr: &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1337},
remoteAddr: &net.UDPAddr{IP: net.IPv4(192, 168, 13, 37), Port: 1234},
context: ctx,
}
var err error
conn, err = newQuicConn(sess, nil)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) priv, err := ic.UnmarshalRsaPrivateKey(x509.MarshalPKCS1PrivateKey(key))
Expect(err).ToNot(HaveOccurred())
It("has the correct local address", func() { return priv
Expect(conn.LocalAddr()).To(Equal(sess.localAddr)) }
Expect(conn.LocalMultiaddr().String()).To(Equal("/ip4/127.0.0.1/udp/1337/quic"))
})
It("has the correct remote address", func() {
Expect(conn.RemoteAddr()).To(Equal(sess.remoteAddr))
Expect(conn.RemoteMultiaddr().String()).To(Equal("/ip4/192.168.13.37/udp/1234/quic"))
})
It("closes", func() { runServer := func() (<-chan ma.Multiaddr, <-chan tpt.Conn) {
err := conn.Close() serverTransport, err := NewTransport(serverKey)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(sess.closed).To(BeTrue()) addrChan := make(chan ma.Multiaddr)
}) connChan := make(chan tpt.Conn)
go func() {
defer GinkgoRecover()
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
Expect(err).ToNot(HaveOccurred())
ln, err := serverTransport.Listen(addr)
Expect(err).ToNot(HaveOccurred())
addrChan <- ln.Multiaddr()
conn, err := ln.Accept()
Expect(err).ToNot(HaveOccurred())
connChan <- conn
}()
return addrChan, connChan
}
It("says if it is closed", func() { BeforeEach(func() {
Consistently(func() bool { return conn.IsClosed() }).Should(BeFalse()) var err error
ctxCancel() serverKey = createPeer()
Eventually(func() bool { return conn.IsClosed() }).Should(BeTrue()) serverID, err = peer.IDFromPrivateKey(serverKey)
Expect(err).ToNot(HaveOccurred())
clientKey = createPeer()
clientID, err = peer.IDFromPrivateKey(clientKey)
Expect(err).ToNot(HaveOccurred())
}) })
Context("opening streams", func() { It("handshakes", func() {
It("opens streams", func() { serverAddrChan, serverConnChan := runServer()
s := &mockStream{id: 1337} clientTransport, err := NewTransport(clientKey)
sess.streamToOpen = s Expect(err).ToNot(HaveOccurred())
str, err := conn.OpenStream() serverAddr := <-serverAddrChan
Expect(err).ToNot(HaveOccurred()) conn, err := clientTransport.Dial(context.Background(), serverAddr, serverID)
Expect(str.(*stream).Stream).To(Equal(s)) Expect(err).ToNot(HaveOccurred())
}) serverConn := <-serverConnChan
Expect(conn.LocalPeer()).To(Equal(clientID))
It("errors when it can't open a stream", func() { Expect(conn.LocalPrivateKey()).To(Equal(clientKey))
testErr := errors.New("stream open err") Expect(conn.RemotePeer()).To(Equal(serverID))
sess.streamOpenErr = testErr Expect(conn.RemotePublicKey()).To(Equal(serverKey.GetPublic()))
_, err := conn.OpenStream() Expect(serverConn.LocalPeer()).To(Equal(serverID))
Expect(err).To(MatchError(testErr)) Expect(serverConn.LocalPrivateKey()).To(Equal(serverKey))
}) Expect(serverConn.RemotePeer()).To(Equal(clientID))
Expect(serverConn.RemotePublicKey()).To(Equal(clientKey.GetPublic()))
}) })
Context("accepting streams", func() { It("fails if the peer ID doesn't match", func() {
It("accepts streams", func() { thirdPartyID, err := peer.IDFromPrivateKey(createPeer())
s := &mockStream{id: 1337} Expect(err).ToNot(HaveOccurred())
sess.streamToAccept = s
str, err := conn.AcceptStream()
Expect(err).ToNot(HaveOccurred())
Expect(str.(*stream).Stream).To(Equal(s))
})
It("errors when it can't open a stream", func() { serverAddrChan, _ := runServer()
testErr := errors.New("stream open err") clientTransport, err := NewTransport(clientKey)
sess.streamAcceptErr = testErr Expect(err).ToNot(HaveOccurred())
_, err := conn.AcceptStream() serverAddr := <-serverAddrChan
Expect(err).To(MatchError(testErr)) // dial, but expect the wrong peer ID
}) _, err = clientTransport.Dial(context.Background(), serverAddr, thirdPartyID)
Expect(err).To(MatchError("peer IDs don't match"))
// TODO(#2): don't accept a connection if the client's peer verification fails
// Consistently(serverConnChan).ShouldNot(Receive())
}) })
}) })
package libp2pquic
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/tls"
"crypto/x509"
"errors"
"math/big"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/gogo/protobuf/proto"
ic "github.com/libp2p/go-libp2p-crypto"
pb "github.com/libp2p/go-libp2p-crypto/pb"
)
// mint certificate selection is broken.
const hostname = "quic.ipfs"
type connectionStater interface {
ConnectionState() quic.ConnectionState
}
// TODO: make this private
func GenerateConfig(privKey ic.PrivKey) (*tls.Config, error) {
key, hostCert, err := keyToCertificate(privKey)
if err != nil {
return nil, err
}
// The ephemeral key used just for a couple of connections (or a limited time).
ephemeralKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, err
}
// Sign the ephemeral key using the host key.
// This is the only time that the host's private key of the peer is needed.
// Note that this step could be done asynchronously, such that a running node doesn't need access its private key at all.
certTemplate := &x509.Certificate{
DNSNames: []string{hostname},
SerialNumber: big.NewInt(1),
NotBefore: time.Now().Add(-24 * time.Hour),
NotAfter: time.Now().Add(30 * 24 * time.Hour),
}
certDER, err := x509.CreateCertificate(rand.Reader, certTemplate, hostCert, ephemeralKey.Public(), key)
if err != nil {
return nil, err
}
cert, err := x509.ParseCertificate(certDER)
if err != nil {
return nil, err
}
return &tls.Config{
ServerName: hostname,
InsecureSkipVerify: true, // This is not insecure here. We will verify the cert chain ourselves.
ClientAuth: tls.RequireAnyClientCert,
Certificates: []tls.Certificate{{
Certificate: [][]byte{cert.Raw, hostCert.Raw},
PrivateKey: ephemeralKey,
}},
}, nil
}
func getRemotePubKey(conn connectionStater) (ic.PubKey, error) {
certChain := conn.ConnectionState().PeerCertificates
if len(certChain) != 2 {
return nil, errors.New("expected 2 certificates in the chain")
}
pool := x509.NewCertPool()
pool.AddCert(certChain[1])
if _, err := certChain[0].Verify(x509.VerifyOptions{Roots: pool}); err != nil {
return nil, err
}
remotePubKey, err := x509.MarshalPKIXPublicKey(certChain[1].PublicKey)
if err != nil {
return nil, err
}
return ic.UnmarshalRsaPublicKey(remotePubKey)
}
func keyToCertificate(sk ic.PrivKey) (interface{}, *x509.Certificate, error) {
sn, err := rand.Int(rand.Reader, big.NewInt(1<<62))
if err != nil {
return nil, nil, err
}
tmpl := &x509.Certificate{
SerialNumber: sn,
NotBefore: time.Now().Add(-24 * time.Hour),
NotAfter: time.Now().Add(30 * 24 * time.Hour),
IsCA: true,
BasicConstraintsValid: true,
}
var publicKey, privateKey interface{}
keyBytes, err := sk.Bytes()
if err != nil {
return nil, nil, err
}
pbmes := new(pb.PrivateKey)
if err := proto.Unmarshal(keyBytes, pbmes); err != nil {
return nil, nil, err
}
switch pbmes.GetType() {
case pb.KeyType_RSA:
k, err := x509.ParsePKCS1PrivateKey(pbmes.GetData())
if err != nil {
return nil, nil, err
}
publicKey = &k.PublicKey
privateKey = k
// TODO: add support for ECDSA
default:
return nil, nil, errors.New("unsupported key type for TLS")
}
certDER, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, publicKey, privateKey)
if err != nil {
return nil, nil, err
}
cert, err := x509.ParseCertificate(certDER)
if err != nil {
return nil, nil, err
}
return privateKey, cert, nil
}
package libp2pquic
import (
"context"
"crypto/tls"
tpt "github.com/libp2p/go-libp2p-transport"
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/whyrusleeping/mafmt"
)
type dialer struct {
transport tpt.Transport
}
var _ tpt.Dialer = &dialer{}
func newDialer(transport tpt.Transport) (*dialer, error) {
return &dialer{
transport: transport,
}, nil
}
func (d *dialer) Dial(raddr ma.Multiaddr) (tpt.Conn, error) {
// TODO: check that raddr is a QUIC address
_, host, err := manet.DialArgs(raddr)
if err != nil {
return nil, err
}
qsess, err := quic.DialAddr(host, &tls.Config{InsecureSkipVerify: true}, nil)
if err != nil {
return nil, err
}
return newQuicConn(qsess, d.transport)
}
func (d *dialer) DialContext(ctx context.Context, raddr ma.Multiaddr) (tpt.Conn, error) {
// TODO: implement the ctx
return d.Dial(raddr)
}
func (d *dialer) Matches(a ma.Multiaddr) bool {
return mafmt.QUIC.Matches(a)
}
package libp2pquic
import (
tpt "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Listener", func() {
var (
d *dialer
transport tpt.Transport
)
BeforeEach(func() {
var err error
transport = &QuicTransport{}
d, err = newDialer(transport)
Expect(err).ToNot(HaveOccurred())
})
It("dials", func() {
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/8888")
Expect(err).ToNot(HaveOccurred())
// start a listener to connect to
var ln *listener
go func() {
defer GinkgoRecover()
ln, err = newListener(addr, transport)
Expect(err).ToNot(HaveOccurred())
_, err = ln.Accept()
Expect(err).ToNot(HaveOccurred())
}()
Eventually(func() *listener { return ln }).ShouldNot(BeNil())
conn, err := d.Dial(addr)
Expect(err).ToNot(HaveOccurred())
Expect(conn.Transport()).To(Equal(d.transport))
})
It("matches", func() {
invalidAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
Expect(err).ToNot(HaveOccurred())
validAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234/quic")
Expect(err).ToNot(HaveOccurred())
Expect(d.Matches(invalidAddr)).To(BeFalse())
Expect(d.Matches(validAddr)).To(BeTrue())
})
})
package libp2pquic package libp2pquic
import ( import (
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"math/big"
"net" "net"
ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport" tpt "github.com/libp2p/go-libp2p-transport"
quic "github.com/lucas-clemente/quic-go" quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net" manet "github.com/multiformats/go-multiaddr-net"
) )
var quicListenAddr = quic.ListenAddr
// A listener listens for QUIC connections.
type listener struct { type listener struct {
laddr ma.Multiaddr
quicListener quic.Listener quicListener quic.Listener
transport tpt.Transport
acceptQueue chan tpt.Conn
transport tpt.Transport privKey ic.PrivKey
localPeer peer.ID
localMultiaddr ma.Multiaddr
} }
var _ tpt.Listener = &listener{} var _ tpt.Listener = &listener{}
func newListener(laddr ma.Multiaddr, t tpt.Transport) (*listener, error) { func newListener(addr ma.Multiaddr, transport tpt.Transport, localPeer peer.ID, key ic.PrivKey) (tpt.Listener, error) {
_, host, err := manet.DialArgs(laddr) _, host, err := manet.DialArgs(addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tlsConf, err := generateTLSConfig() tlsConf, err := GenerateConfig(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
qln, err := quic.ListenAddr(host, tlsConf, nil) ln, err := quicListenAddr(host, tlsConf, &quic.Config{Versions: []quic.VersionNumber{101}})
if err != nil { if err != nil {
return nil, err return nil, err
} }
addr, err := quicMultiAddress(qln.Addr()) localMultiaddr, err := quicMultiaddr(ln.Addr())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &listener{ return &listener{
laddr: addr, quicListener: ln,
quicListener: qln, transport: transport,
transport: t, privKey: key,
localPeer: localPeer,
localMultiaddr: localMultiaddr,
}, nil }, nil
} }
// Accept accepts new connections.
// TODO(#2): don't accept a connection if the client's peer verification fails
func (l *listener) Accept() (tpt.Conn, error) { func (l *listener) Accept() (tpt.Conn, error) {
sess, err := l.quicListener.Accept() sess, err := l.quicListener.Accept()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newQuicConn(sess, l.transport) remotePubKey, err := getRemotePubKey(sess)
if err != nil {
return nil, err
}
remotePeerID, err := peer.IDFromPublicKey(remotePubKey)
if err != nil {
return nil, err
}
remoteMultiaddr, err := quicMultiaddr(sess.RemoteAddr())
if err != nil {
return nil, err
}
return &conn{
sess: sess,
transport: l.transport,
localPeer: l.localPeer,
localMultiaddr: l.localMultiaddr,
privKey: l.privKey,
remoteMultiaddr: remoteMultiaddr,
remotePeerID: remotePeerID,
remotePubKey: remotePubKey,
}, nil
} }
// Close closes the listener.
func (l *listener) Close() error { func (l *listener) Close() error {
return l.quicListener.Close() return l.quicListener.Close()
} }
// Addr returns the address of this listener.
func (l *listener) Addr() net.Addr { func (l *listener) Addr() net.Addr {
return l.quicListener.Addr() return l.quicListener.Addr()
} }
// Multiaddr returns the multiaddress of this listener.
func (l *listener) Multiaddr() ma.Multiaddr { func (l *listener) Multiaddr() ma.Multiaddr {
return l.laddr return l.localMultiaddr
}
// Generate a bare-bones TLS config for the server.
// The client doesn't verify the certificate yet.
func generateTLSConfig() (*tls.Config, error) {
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, err
}
template := x509.Certificate{SerialNumber: big.NewInt(1)}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
return nil, err
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
return nil, err
}
return &tls.Config{Certificates: []tls.Certificate{tlsCert}}, nil
} }
package libp2pquic package libp2pquic
import ( import (
"errors" "crypto/rand"
"crypto/rsa"
"crypto/x509"
"fmt"
"net" "net"
ic "github.com/libp2p/go-libp2p-crypto"
tpt "github.com/libp2p/go-libp2p-transport" tpt "github.com/libp2p/go-libp2p-transport"
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
type mockQuicListener struct {
connToAccept net.Conn
serveErr error
closed bool
sessionToAccept quic.Session
acceptErr error
}
var _ quic.Listener = &mockQuicListener{}
func (m *mockQuicListener) Close() error { m.closed = true; return nil }
func (m *mockQuicListener) Accept() (quic.Session, error) { return m.sessionToAccept, m.acceptErr }
func (m *mockQuicListener) Addr() net.Addr { panic("not implemented") }
var _ = Describe("Listener", func() { var _ = Describe("Listener", func() {
var ( var (
l *listener t tpt.Transport
quicListener *mockQuicListener localAddr ma.Multiaddr
transport tpt.Transport
) )
BeforeEach(func() { BeforeEach(func() {
quicListener = &mockQuicListener{} rsaKey, err := rsa.GenerateKey(rand.Reader, 1024)
transport = &QuicTransport{} Expect(err).ToNot(HaveOccurred())
l = &listener{ key, err := ic.UnmarshalRsaPrivateKey(x509.MarshalPKCS1PrivateKey(rsaKey))
quicListener: quicListener, Expect(err).ToNot(HaveOccurred())
transport: transport, t, err = NewTransport(key)
}
})
It("returns its addr", func() {
laddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
l, err = newListener(laddr, nil) localAddr, err = ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
as := l.Addr().String()
Expect(as).ToNot(Equal("127.0.0.1:0)"))
Expect(as).To(ContainSubstring("127.0.0.1:"))
}) })
It("returns its multiaddr", func() { It("returns the address it is listening on", func() {
laddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic") ln, err := t.Listen(localAddr)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
l, err = newListener(laddr, nil) netAddr := ln.Addr()
Expect(err).ToNot(HaveOccurred()) Expect(netAddr).To(BeAssignableToTypeOf(&net.UDPAddr{}))
mas := l.Multiaddr().String() port := netAddr.(*net.UDPAddr).Port
Expect(mas).ToNot(Equal("/ip4/127.0.0.1/udp/0/quic")) Expect(port).ToNot(BeZero())
Expect(mas).To(ContainSubstring("/ip4/127.0.0.1/udp/")) Expect(ln.Multiaddr().String()).To(Equal(fmt.Sprintf("/ip4/127.0.0.1/udp/%d/quic", port)))
Expect(mas).To(ContainSubstring("/quic"))
}) })
It("closes", func() { It("returns Accept when it is closed", func() {
err := l.Close() addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/quic")
Expect(err).ToNot(HaveOccurred())
ln, err := t.Listen(addr)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(quicListener.closed).To(BeTrue()) done := make(chan struct{})
go func() {
defer GinkgoRecover()
ln.Accept()
close(done)
}()
Consistently(done).ShouldNot(BeClosed())
Expect(ln.Close()).To(Succeed())
Eventually(done).Should(BeClosed())
}) })
Context("accepting", func() { It("doesn't accept Accept calls after it is closed", func() {
It("errors if no connection can be accepted", func() { ln, err := t.Listen(localAddr)
testErr := errors.New("test error") Expect(err).ToNot(HaveOccurred())
quicListener.acceptErr = testErr Expect(ln.Close()).To(Succeed())
_, err := l.Accept() _, err = ln.Accept()
Expect(err).To(MatchError(testErr)) Expect(err).To(HaveOccurred())
})
}) })
}) })
...@@ -2,7 +2,7 @@ package libp2pquic ...@@ -2,7 +2,7 @@ package libp2pquic
import ( import (
smux "github.com/libp2p/go-stream-muxer" smux "github.com/libp2p/go-stream-muxer"
"github.com/lucas-clemente/quic-go" quic "github.com/lucas-clemente/quic-go"
) )
type stream struct { type stream struct {
...@@ -12,6 +12,8 @@ type stream struct { ...@@ -12,6 +12,8 @@ type stream struct {
var _ smux.Stream = &stream{} var _ smux.Stream = &stream{}
func (s *stream) Reset() error { func (s *stream) Reset() error {
s.Stream.Reset(nil) if err := s.Stream.CancelRead(0); err != nil {
return nil return err
}
return s.Stream.CancelWrite(0)
} }
package libp2pquic package libp2pquic
import ( import (
"fmt" "context"
"sync" "errors"
ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport" tpt "github.com/libp2p/go-libp2p-transport"
quic "github.com/lucas-clemente/quic-go"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/whyrusleeping/mafmt" "github.com/whyrusleeping/mafmt"
) )
// QuicTransport implements a QUIC Transport var quicDialAddr = quic.DialAddr
type QuicTransport struct {
lmutex sync.Mutex
listeners map[string]tpt.Listener
dmutex sync.Mutex // The Transport implements the tpt.Transport interface for QUIC connections.
dialers map[string]tpt.Dialer type transport struct {
privKey ic.PrivKey
localPeer peer.ID
} }
var _ tpt.Transport = &QuicTransport{} var _ tpt.Transport = &transport{}
// NewQuicTransport creates a new QUIC Transport // NewTransport creates a new QUIC transport
// it tracks dialers and listeners created func NewTransport(key ic.PrivKey) (tpt.Transport, error) {
func NewQuicTransport() *QuicTransport { localPeer, err := peer.IDFromPrivateKey(key)
// utils.SetLogLevel(utils.LogLevelDebug) if err != nil {
return &QuicTransport{ return nil, err
listeners: make(map[string]tpt.Listener),
dialers: make(map[string]tpt.Dialer),
} }
return &transport{
privKey: key,
localPeer: localPeer,
}, nil
} }
func (t *QuicTransport) Dialer(laddr ma.Multiaddr, opts ...tpt.DialOpt) (tpt.Dialer, error) { // Dial dials a new QUIC connection
if !t.Matches(laddr) { func (t *transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
return nil, fmt.Errorf("quic transport cannot dial %q", laddr) _, host, err := manet.DialArgs(raddr)
} if err != nil {
return nil, err
t.dmutex.Lock()
defer t.dmutex.Unlock()
s := laddr.String()
d, ok := t.dialers[s]
if ok {
return d, nil
} }
tlsConf, err := GenerateConfig(t.privKey)
// TODO: read opts
quicd, err := newDialer(t)
if err != nil { if err != nil {
return nil, err return nil, err
} }
t.dialers[s] = quicd sess, err := quicDialAddr(host, tlsConf, &quic.Config{Versions: []quic.VersionNumber{101}})
return quicd, nil if err != nil {
} return nil, err
// Listen starts listening on laddr
func (t *QuicTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
if !t.Matches(laddr) {
return nil, fmt.Errorf("quic transport cannot listen on %q", laddr)
} }
remotePubKey, err := getRemotePubKey(sess)
t.lmutex.Lock() if err != nil {
defer t.lmutex.Unlock() return nil, err
l, ok := t.listeners[laddr.String()]
if ok {
return l, nil
} }
localMultiaddr, err := quicMultiaddr(sess.LocalAddr())
ln, err := newListener(laddr, t)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !p.MatchesPublicKey(remotePubKey) {
err := errors.New("peer IDs don't match")
sess.Close(err)
return nil, err
}
return &conn{
privKey: t.privKey,
localPeer: t.localPeer,
localMultiaddr: localMultiaddr,
remotePubKey: remotePubKey,
remotePeerID: p,
remoteMultiaddr: raddr,
}, nil
}
// CanDial determines if we can dial to an address
func (t *transport) CanDial(addr ma.Multiaddr) bool {
return mafmt.QUIC.Matches(addr)
}
t.listeners[laddr.String()] = ln // Listen listens for new QUIC connections on the passed multiaddr.
return ln, nil func (t *transport) Listen(addr ma.Multiaddr) (tpt.Listener, error) {
return newListener(addr, t, t.localPeer, t.privKey)
} }
func (t *QuicTransport) Matches(a ma.Multiaddr) bool { // Proxy returns true if this transport proxies.
return mafmt.QUIC.Matches(a) func (t *transport) Proxy() bool {
return false
} }
var _ tpt.Transport = &QuicTransport{} // Protocols returns the set of protocols handled by this transport.
func (t *transport) Protocols() []int {
return []int{ma.P_QUIC}
}
func (t *transport) String() string {
return "QUIC"
}
package libp2pquic package libp2pquic
import ( import (
tpt "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
...@@ -8,75 +9,24 @@ import ( ...@@ -8,75 +9,24 @@ import (
) )
var _ = Describe("Transport", func() { var _ = Describe("Transport", func() {
var t *QuicTransport var t tpt.Transport
BeforeEach(func() { BeforeEach(func() {
t = NewQuicTransport() t = &transport{}
}) })
Context("listening", func() { It("says if it can dial an address", func() {
It("creates a new listener", func() {
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234/quic")
Expect(err).ToNot(HaveOccurred())
ln, err := t.Listen(maddr)
Expect(err).ToNot(HaveOccurred())
Expect(ln.Multiaddr()).To(Equal(maddr))
})
It("returns an existing listener", func() {
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1235/quic")
Expect(err).ToNot(HaveOccurred())
ln, err := t.Listen(maddr)
Expect(err).ToNot(HaveOccurred())
Expect(ln.Multiaddr()).To(Equal(maddr))
ln2, err := t.Listen(maddr)
Expect(err).ToNot(HaveOccurred())
Expect(ln2).To(Equal(ln))
Expect(t.listeners).To(HaveLen(1))
})
It("errors if the address is not a QUIC address", func() {
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1235/utp")
Expect(err).ToNot(HaveOccurred())
_, err = t.Listen(maddr)
Expect(err).To(MatchError("quic transport cannot listen on \"/ip4/127.0.0.1/udp/1235/utp\""))
})
})
Context("dialing", func() {
It("creates a new dialer", func() {
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234/quic")
Expect(err).ToNot(HaveOccurred())
d, err := t.Dialer(maddr)
Expect(err).ToNot(HaveOccurred())
Expect(d).ToNot(BeNil())
})
It("returns an existing dialer", func() {
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1235/quic")
Expect(err).ToNot(HaveOccurred())
d, err := t.Dialer(maddr)
Expect(err).ToNot(HaveOccurred())
d2, err := t.Dialer(maddr)
Expect(err).ToNot(HaveOccurred())
Expect(d2).To(Equal(d))
Expect(t.dialers).To(HaveLen(1))
})
It("errors if the address is not a QUIC address", func() {
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1235/utp")
Expect(err).ToNot(HaveOccurred())
_, err = t.Dialer(maddr)
Expect(err).To(MatchError("quic transport cannot dial \"/ip4/127.0.0.1/udp/1235/utp\""))
})
})
It("matches", func() {
invalidAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234") invalidAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
validAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234/quic") validAddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234/quic")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(t.Matches(invalidAddr)).To(BeFalse()) Expect(t.CanDial(invalidAddr)).To(BeFalse())
Expect(t.Matches(validAddr)).To(BeTrue()) Expect(t.CanDial(validAddr)).To(BeTrue())
})
It("supports the QUIC protocol", func() {
protocols := t.Protocols()
Expect(protocols).To(HaveLen(1))
Expect(protocols[0]).To(Equal(ma.P_QUIC))
}) })
}) })
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment