Commit 714ac78c authored by Can ZHANG's avatar Can ZHANG
Browse files

QUIC-based hole punching ready for test

parent 5d72a118
......@@ -15,6 +15,7 @@ import (
protocol "github.com/libp2p/go-libp2p-protocol"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
ping "github.com/libp2p/go-libp2p/p2p/protocol/ping"
punching "github.com/libp2p/go-libp2p/p2p/protocol/punching"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
msmux "github.com/multiformats/go-multistream"
......@@ -56,7 +57,7 @@ const NATPortMap Option = iota
type BasicHost struct {
network inet.Network
mux *msmux.MultistreamMuxer
ids *identify.IDService
Ids *identify.IDService
pings *ping.PingService
natmgr NATManager
maResolver *madns.Resolver
......@@ -126,10 +127,10 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
}
if opts.IdentifyService != nil {
h.ids = opts.IdentifyService
h.Ids = opts.IdentifyService
} else {
// we can't set this as a default above because it depends on the *BasicHost.
h.ids = identify.NewIDService(h)
h.Ids = identify.NewIDService(h)
}
if uint64(opts.NegotiationTimeout) != 0 {
......@@ -159,6 +160,9 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost,
h.pings = ping.NewPingService(h)
}
// FIXME move out of basic_host
punching.NewPunchingService(h, h.Ids)
net.SetConnHandler(h.newConnHandler)
net.SetStreamHandler(h.newStreamHandler)
return h, nil
......@@ -206,7 +210,7 @@ func (h *BasicHost) newConnHandler(c inet.Conn) {
// Clear protocols on connecting to new peer to avoid issues caused
// by misremembering protocols between reconnects
h.Peerstore().SetProtocols(c.RemotePeer())
h.ids.IdentifyConn(c)
h.Ids.IdentifyConn(c)
}
// newStreamHandler is the remote-opened stream handler for inet.Network
......@@ -260,7 +264,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
// PushIdentify pushes an identify update through the identify push protocol
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) PushIdentify() {
h.ids.Push()
h.Ids.Push()
}
// ID returns the (local) peer.ID associated with this Host
......@@ -285,7 +289,7 @@ func (h *BasicHost) Mux() *msmux.MultistreamMuxer {
// IDService returns
func (h *BasicHost) IDService() *identify.IDService {
return h.ids
return h.Ids
}
// SetStreamHandler sets the protocol handler on the Host's Mux.
......@@ -459,7 +463,7 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
// identify the connection before returning.
done := make(chan struct{})
go func() {
h.ids.IdentifyConn(c)
h.Ids.IdentifyConn(c)
close(done)
}()
......@@ -508,9 +512,9 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {
log.Debug("error retrieving network interface addrs")
}
var observedAddrs []ma.Multiaddr
if h.ids != nil {
if h.Ids != nil {
// peer observed addresses
observedAddrs = h.ids.OwnObservedAddrs()
observedAddrs = h.Ids.OwnObservedAddrs()
}
var natAddrs []ma.Multiaddr
// natmgr is nil if we do not use nat option;
......
......@@ -215,13 +215,13 @@ func TestHostProtoPreknowledge(t *testing.T) {
// wait for identify handshake to finish completely
select {
case <-h1.ids.IdentifyWait(h1.Network().ConnsToPeer(h2.ID())[0]):
case <-h1.Ids.IdentifyWait(h1.Network().ConnsToPeer(h2.ID())[0]):
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for identify")
}
select {
case <-h2.ids.IdentifyWait(h2.Network().ConnsToPeer(h1.ID())[0]):
case <-h2.Ids.IdentifyWait(h2.Network().ConnsToPeer(h1.ID())[0]):
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for identify")
}
......
......@@ -92,7 +92,9 @@ func (h *AutoRelayHost) background(ctx context.Context) {
for {
wait := autonat.AutoNATRefreshInterval
switch h.autonat.Status() {
status := h.autonat.Status()
h.Ids.SetNatStatus(status)
switch status {
case autonat.NATStatusUnknown:
wait = autonat.AutoNATRetryInterval
case autonat.NATStatusPublic:
......
......@@ -2,19 +2,20 @@ package identify
import (
"context"
"net"
"sync"
"time"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
autonat "github.com/libp2p/go-libp2p-autonat"
ic "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
lgbl "github.com/libp2p/go-libp2p-loggables"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
ma "github.com/multiformats/go-multiaddr"
msmux "github.com/multiformats/go-multistream"
)
......@@ -54,6 +55,9 @@ type IDService struct {
// our own observed addresses.
// TODO: instead of expiring, remove these when we disconnect
observedAddrs ObservedAddrSet
// NAT status
natStatus pb.Identify_NATStatus
}
// NewIDService constructs a new *IDService and activates it by
......@@ -163,6 +167,21 @@ func (ids *IDService) Push() {
}
}
func (ids *IDService) SetNatStatus(status autonat.NATStatus) {
switch status {
case autonat.NATStatusPrivate:
ids.natStatus = pb.Identify_NATStatusPrivate
case autonat.NATStatusPublic:
ids.natStatus = pb.Identify_NATStatusPublic
default:
ids.natStatus = pb.Identify_NATStatusUnknown
}
}
func (ids *IDService) GetNatStatus() pb.Identify_NATStatus {
return ids.natStatus
}
func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {
// set protocols this node is currently handling
......@@ -201,6 +220,9 @@ func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) {
av := ClientVersion
mes.ProtocolVersion = &pv
mes.AgentVersion = &av
// set if behind NAT when possible
mes.NatStatus = &ids.natStatus
}
func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) {
......@@ -209,6 +231,8 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) {
// mes.Protocols
ids.Host.Peerstore().SetProtocols(p, mes.Protocols...)
ids.Host.Peerstore().Put(p, "natStatus", mes.GetNatStatus())
// mes.ObservedAddr
ids.consumeObservedAddress(mes.GetObservedAddr(), c)
......
......@@ -20,6 +20,45 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Identify_NATStatus int32
const (
Identify_NATStatusUnknown Identify_NATStatus = 0
Identify_NATStatusPublic Identify_NATStatus = 1
Identify_NATStatusPrivate Identify_NATStatus = 2
)
var Identify_NATStatus_name = map[int32]string{
0: "NATStatusUnknown",
1: "NATStatusPublic",
2: "NATStatusPrivate",
}
var Identify_NATStatus_value = map[string]int32{
"NATStatusUnknown": 0,
"NATStatusPublic": 1,
"NATStatusPrivate": 2,
}
func (x Identify_NATStatus) Enum() *Identify_NATStatus {
p := new(Identify_NATStatus)
*p = x
return p
}
func (x Identify_NATStatus) String() string {
return proto.EnumName(Identify_NATStatus_name, int32(x))
}
func (x *Identify_NATStatus) UnmarshalJSON(data []byte) error {
value, err := proto.UnmarshalJSONEnum(Identify_NATStatus_value, data, "Identify_NATStatus")
if err != nil {
return err
}
*x = Identify_NATStatus(value)
return nil
}
func (Identify_NATStatus) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_identify_abe757650f0e8336, []int{0, 0}
}
type Identify struct {
// protocolVersion determines compatibility between peers
ProtocolVersion *string `protobuf:"bytes,5,opt,name=protocolVersion" json:"protocolVersion,omitempty"`
......@@ -37,17 +76,18 @@ type Identify struct {
// determine whether its connection to the local peer goes through NAT.
ObservedAddr []byte `protobuf:"bytes,4,opt,name=observedAddr" json:"observedAddr,omitempty"`
// protocols are the services this node is running
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
NatStatus *Identify_NATStatus `protobuf:"varint,7,opt,name=natStatus,enum=identify.pb.Identify_NATStatus" json:"natStatus,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Identify) Reset() { *m = Identify{} }
func (m *Identify) String() string { return proto.CompactTextString(m) }
func (*Identify) ProtoMessage() {}
func (*Identify) Descriptor() ([]byte, []int) {
return fileDescriptor_identify_daaec8baf46eae80, []int{0}
return fileDescriptor_identify_abe757650f0e8336, []int{0}
}
func (m *Identify) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
......@@ -118,8 +158,16 @@ func (m *Identify) GetProtocols() []string {
return nil
}
func (m *Identify) GetNatStatus() Identify_NATStatus {
if m != nil && m.NatStatus != nil {
return *m.NatStatus
}
return Identify_NATStatusUnknown
}
func init() {
proto.RegisterType((*Identify)(nil), "identify.pb.Identify")
proto.RegisterEnum("identify.pb.Identify_NATStatus", Identify_NATStatus_name, Identify_NATStatus_value)
}
func (m *Identify) Marshal() (dAtA []byte, err error) {
size := m.Size()
......@@ -183,6 +231,11 @@ func (m *Identify) MarshalTo(dAtA []byte) (int, error) {
i = encodeVarintIdentify(dAtA, i, uint64(len(*m.AgentVersion)))
i += copy(dAtA[i:], *m.AgentVersion)
}
if m.NatStatus != nil {
dAtA[i] = 0x38
i++
i = encodeVarintIdentify(dAtA, i, uint64(*m.NatStatus))
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
......@@ -199,6 +252,9 @@ func encodeVarintIdentify(dAtA []byte, offset int, v uint64) int {
return offset + 1
}
func (m *Identify) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.PublicKey != nil {
......@@ -229,6 +285,9 @@ func (m *Identify) Size() (n int) {
l = len(*m.AgentVersion)
n += 1 + l + sovIdentify(uint64(l))
}
if m.NatStatus != nil {
n += 1 + sovIdentify(uint64(*m.NatStatus))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
......@@ -457,6 +516,26 @@ func (m *Identify) Unmarshal(dAtA []byte) error {
s := string(dAtA[iNdEx:postIndex])
m.AgentVersion = &s
iNdEx = postIndex
case 7:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NatStatus", wireType)
}
var v Identify_NATStatus
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowIdentify
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (Identify_NATStatus(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.NatStatus = &v
default:
iNdEx = preIndex
skippy, err := skipIdentify(dAtA[iNdEx:])
......@@ -584,20 +663,25 @@ var (
ErrIntOverflowIdentify = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("identify.proto", fileDescriptor_identify_daaec8baf46eae80) }
func init() { proto.RegisterFile("identify.proto", fileDescriptor_identify_abe757650f0e8336) }
var fileDescriptor_identify_daaec8baf46eae80 = []byte{
// 187 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4c, 0x49, 0xcd,
0x2b, 0xc9, 0x4c, 0xab, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0xf0, 0x93, 0x94,
0x6e, 0x31, 0x72, 0x71, 0x78, 0x42, 0xf9, 0x42, 0x32, 0x5c, 0x9c, 0x05, 0xa5, 0x49, 0x39, 0x99,
0xc9, 0xde, 0xa9, 0x95, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x08, 0x01, 0x21, 0x05, 0x2e,
0xee, 0x9c, 0xcc, 0xe2, 0x92, 0xd4, 0x3c, 0xc7, 0x94, 0x94, 0xa2, 0x62, 0x09, 0x26, 0x05, 0x66,
0x0d, 0x9e, 0x20, 0x64, 0x21, 0xb0, 0x7e, 0x90, 0x15, 0xc9, 0xf9, 0x39, 0xc5, 0x12, 0xcc, 0x0a,
0xcc, 0x1a, 0x9c, 0x41, 0x08, 0x01, 0x21, 0x25, 0x2e, 0x9e, 0xfc, 0xa4, 0xe2, 0xd4, 0xa2, 0xb2,
0xd4, 0x14, 0x90, 0x72, 0x09, 0x16, 0xb0, 0x05, 0x28, 0x62, 0x42, 0x1a, 0x5c, 0xfc, 0x30, 0x0d,
0x61, 0xa9, 0x45, 0xc5, 0x99, 0xf9, 0x79, 0x12, 0xac, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0xe8, 0xc2,
0x20, 0xd3, 0x12, 0xd3, 0x53, 0xf3, 0x4a, 0x60, 0xca, 0xd8, 0xc0, 0xca, 0x50, 0xc4, 0x9c, 0x78,
0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0x46, 0x40, 0x00, 0x00,
0x00, 0xff, 0xff, 0x4b, 0x9c, 0x90, 0x7a, 0x08, 0x01, 0x00, 0x00,
var fileDescriptor_identify_abe757650f0e8336 = []byte{
// 262 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x90, 0xbb, 0x4e, 0xc3, 0x30,
0x18, 0x85, 0x71, 0xc2, 0xcd, 0x7f, 0xa3, 0x36, 0x32, 0x0c, 0x1e, 0x50, 0xb0, 0x32, 0x79, 0xca,
0xc0, 0xce, 0x50, 0x36, 0x04, 0x42, 0x95, 0xb9, 0xec, 0x49, 0x63, 0x90, 0x45, 0x64, 0x57, 0xb6,
0x5b, 0xd4, 0x37, 0x64, 0xe4, 0x11, 0x50, 0x16, 0x5e, 0x03, 0x25, 0x90, 0xa4, 0xe9, 0x78, 0x3e,
0x7d, 0xc7, 0xbf, 0x8e, 0x61, 0xaa, 0x4a, 0xa9, 0xbd, 0x7a, 0xdd, 0x66, 0x2b, 0x6b, 0xbc, 0x21,
0x93, 0x21, 0x17, 0xe9, 0x4f, 0x00, 0xa7, 0xb7, 0xff, 0x99, 0x5c, 0x00, 0x5e, 0xad, 0x8b, 0x4a,
0x2d, 0xef, 0xe4, 0x96, 0x22, 0x86, 0x78, 0x24, 0x06, 0x40, 0x18, 0x4c, 0x2a, 0xe5, 0xbc, 0xd4,
0xf3, 0xb2, 0xb4, 0x8e, 0x06, 0x2c, 0xe4, 0x91, 0xd8, 0x45, 0x6d, 0xbf, 0x39, 0xb1, 0x34, 0x95,
0xa3, 0x21, 0x0b, 0x39, 0x16, 0x03, 0x20, 0x29, 0x44, 0xa6, 0x70, 0xd2, 0x6e, 0x64, 0xd9, 0xe8,
0xf4, 0xb0, 0x3d, 0x30, 0x62, 0x84, 0xc3, 0xac, 0x2b, 0xbc, 0x48, 0xeb, 0x94, 0xd1, 0xf4, 0x88,
0x21, 0x8e, 0xc5, 0x3e, 0x6e, 0x5e, 0xcb, 0xdf, 0xa4, 0xf6, 0x9d, 0x76, 0xdc, 0x6a, 0x23, 0x46,
0xae, 0x01, 0xeb, 0xdc, 0x3f, 0xfa, 0xdc, 0xaf, 0x1d, 0x3d, 0x61, 0x88, 0x4f, 0xaf, 0x2e, 0xb3,
0x9d, 0xf5, 0x59, 0xb7, 0x3c, 0x7b, 0x98, 0x3f, 0xfd, 0x69, 0x62, 0x68, 0xa4, 0xf7, 0x80, 0x7b,
0x4e, 0xce, 0x21, 0xee, 0xc3, 0xb3, 0x7e, 0xd7, 0xe6, 0x43, 0xc7, 0x07, 0xe4, 0x0c, 0x66, 0x3d,
0x5d, 0xb4, 0x3f, 0x15, 0xa3, 0x91, 0xba, 0xb0, 0x6a, 0x93, 0x7b, 0x19, 0x07, 0x37, 0xd1, 0x67,
0x9d, 0xa0, 0xaf, 0x3a, 0x41, 0xdf, 0x75, 0x82, 0x7e, 0x03, 0x00, 0x00, 0xff, 0xff, 0xca, 0x4d,
0x42, 0xce, 0x95, 0x01, 0x00, 0x00,
}
......@@ -26,4 +26,12 @@ message Identify {
// protocols are the services this node is running
repeated string protocols = 3;
enum NATStatus {
NATStatusUnknown = 0;
NATStatusPublic = 1;
NATStatusPrivate = 2;
};
optional NATStatus natStatus = 7;
}
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: punching.proto
package punching_pb
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import io "io"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type Punching struct {
// addresses for peer to test connectivity
Addresses []string `protobuf:"bytes,1,rep,name=addresses" json:"addresses,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Punching) Reset() { *m = Punching{} }
func (m *Punching) String() string { return proto.CompactTextString(m) }
func (*Punching) ProtoMessage() {}
func (*Punching) Descriptor() ([]byte, []int) {
return fileDescriptor_punching_ccf7b972b2451d9b, []int{0}
}
func (m *Punching) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Punching) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Punching.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (dst *Punching) XXX_Merge(src proto.Message) {
xxx_messageInfo_Punching.Merge(dst, src)
}
func (m *Punching) XXX_Size() int {
return m.Size()
}
func (m *Punching) XXX_DiscardUnknown() {
xxx_messageInfo_Punching.DiscardUnknown(m)
}
var xxx_messageInfo_Punching proto.InternalMessageInfo
func (m *Punching) GetAddresses() []string {
if m != nil {
return m.Addresses
}
return nil
}
func init() {
proto.RegisterType((*Punching)(nil), "punching.pb.Punching")
}
func (m *Punching) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Punching) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Addresses) > 0 {
for _, s := range m.Addresses {
dAtA[i] = 0xa
i++
l = len(s)
for l >= 1<<7 {
dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
l >>= 7
i++
}
dAtA[i] = uint8(l)
i++
i += copy(dAtA[i:], s)
}
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeVarintPunching(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *Punching) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Addresses) > 0 {
for _, s := range m.Addresses {
l = len(s)
n += 1 + l + sovPunching(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovPunching(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozPunching(x uint64) (n int) {
return sovPunching(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Punching) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPunching
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Punching: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Punching: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Addresses", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPunching
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPunching
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Addresses = append(m.Addresses, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPunching(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPunching
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPunching(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPunching
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPunching
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPunching
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, ErrInvalidLengthPunching
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPunching
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipPunching(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthPunching = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPunching = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("punching.proto", fileDescriptor_punching_ccf7b972b2451d9b) }
var fileDescriptor_punching_ccf7b972b2451d9b = []byte{
// 88 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0xcd, 0x4b,
0xce, 0xc8, 0xcc, 0x4b, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0xf0, 0x93, 0x94,
0x34, 0xb8, 0x38, 0x02, 0xa0, 0x5c, 0x21, 0x19, 0x2e, 0xce, 0xc4, 0x94, 0x94, 0xa2, 0xd4, 0xe2,
0xe2, 0xd4, 0x62, 0x09, 0x46, 0x05, 0x66, 0x0d, 0xce, 0x20, 0x84, 0x80, 0x13, 0xcf, 0x89, 0x47,
0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x08, 0x08, 0x00, 0x00, 0xff, 0xff,
0x41, 0xc4, 0x5a, 0xaf, 0x55, 0x00, 0x00, 0x00,
}
syntax = "proto2";
package punching.pb;
message Punching {
// addresses for peer to test connectivity
repeated string addresses = 1;
}
\ No newline at end of file
package punching
import (
"context"
"strings"
"time"
ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
identify_pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
punching_pb "github.com/libp2p/go-libp2p/p2p/protocol/punching/pb"
ma "github.com/multiformats/go-multiaddr"
)
var log = logging.Logger("punching")
const PunchingProtocol = "/libp2p/punching/1.0.0"
type PunchingService struct {
host host.Host
idService *identify.IDService
}
func NewPunchingService(h host.Host, idService *identify.IDService) *PunchingService {
punchingService := &PunchingService{
host: h,
idService: idService,
}
h.SetStreamHandler(PunchingProtocol, punchingService.PunchingHandler)
h.Network().Notify(punchingService)
return punchingService
}
func (p *PunchingService) PunchingHandler(s inet.Stream) {
log.Debug("Incoming Punching Request", s.Protocol(), s.Conn().RemoteMultiaddr().String())
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
var incoming punching_pb.Punching
err := r.ReadMsg(&incoming)
if err != nil {
log.Debug("err read punching_pb.Punching:", err)
return
}
log.Debug("Incoming peer addresses:", incoming.Addresses)
if len(incoming.Addresses) == 0 {
return
}
go func() {
maddrs := make([]ma.Multiaddr, 0, len(incoming.Addresses))
for _, a := range incoming.Addresses {
maddr, err := ma.NewMultiaddr(a)
if err != nil {
log.Debug("Cannot convert", a, "to multiaddress:", err)
continue
}
maddrs = append(maddrs, maddr)
}
if len(maddrs) == 0 {
return
}
peerInfo := pstore.PeerInfo{
ID: s.Conn().RemotePeer(),
Addrs: maddrs,
}
timeCount := 0
for timeCount < 5 {
timeCount += 1
// copied from go-ipfs/core/coreapi/swarm
if swrm, ok := p.host.Network().(*swarm.Swarm); ok {
swrm.Backoff().Clear(peerInfo.ID)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := p.host.Connect(ctx, peerInfo)
cancel()
if err != nil {
log.Info("Failed to connect", peerInfo.ID, "err:", err)
} else {
log.Info("Punch-connect to", incoming.Addresses, "successful")
return
}
time.Sleep(10 * time.Second)
}
}()
}
func (p *PunchingService) punch(peerID peer.ID) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
stream, err := p.host.NewStream(ctx, peerID, PunchingProtocol)
cancel()
if err != nil {
log.Debug("Failed to create new stream to", peerID)
return
}
allAddresses := p.host.Addrs()
quicAddress := make([]string, 0, len(allAddresses))
for _, a := range allAddresses {
s := a.String()
if strings.Contains(s, "/quic") &&
!strings.Contains(s, "/ip6") &&
!strings.Contains(s, "/127.0.0.1") &&
!strings.Contains(s, "/::") &&
!strings.Contains(s, "/192.168.") &&
!strings.Contains(s, "/10.") &&
!strings.Contains(s, "/172.") {
quicAddress = append(quicAddress, s)
}
}
if len(quicAddress) == 0 {
return
}
log.Debug("QUIC addresses:", quicAddress)
msg := punching_pb.Punching{
Addresses: quicAddress,
}
w := ggio.NewDelimitedWriter(stream)
err = w.WriteMsg(&msg)
if err != nil {
log.Debug("err write punching_pb.Punching:", err)
return
}
}
//
// Implement inet.Notifiee
//
func (p *PunchingService) Listen(inet.Network, ma.Multiaddr) {} // called when network starts listening on an addr
func (p *PunchingService) ListenClose(inet.Network, ma.Multiaddr) {} // called when network starts listening on an addr
func (p *PunchingService) Connected(net inet.Network, conn inet.Conn) { // called when a connection opened
go func() {
time.Sleep(5 * time.Second)
log.Debug("Connected", conn.LocalMultiaddr().String(), "<==>", conn.RemoteMultiaddr().String())
peerID := conn.RemotePeer()
protos, err := p.host.Peerstore().SupportsProtocols(peerID, PunchingProtocol)
if err != nil {
log.Debug("error retrieving supported protocols for peer %s: %s\n", peerID, err)
return
}
if len(protos) > 0 {
selfNatStatus := p.idService.GetNatStatus()
var peerNatStatus identify_pb.Identify_NATStatus
peerNatStatusRaw, err := p.host.Peerstore().Get(peerID, "natStatus")
if err == nil {
peerNatStatus = peerNatStatusRaw.(identify_pb.Identify_NATStatus)
}
log.Info("Discovered punching-support peer", peerID.String(),
"self NAT status", selfNatStatus,
"peer NAT status", peerNatStatus)
if selfNatStatus == identify_pb.Identify_NATStatusPublic ||
peerNatStatus == identify_pb.Identify_NATStatusPublic {
return
}
p.punch(peerID)
}
}()
}
func (p *PunchingService) Disconnected(inet.Network, inet.Conn) {} // called when a connection closed
func (p *PunchingService) OpenedStream(net inet.Network, stream inet.Stream) { // called when a stream opened
go func() {
time.Sleep(5 * time.Second)
log.Debug("Connected",
stream.Conn().LocalMultiaddr().String(), "<==>", stream.Conn().RemoteMultiaddr().String(),
"protocol", stream.Protocol())
}()
}
func (p *PunchingService) ClosedStream(inet.Network, inet.Stream) {} // called when a stream closed
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment