diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 13452e2f9cf28d2492b3efd227cbd15ce4990b60..cc8147ccbc0393c439c97b799aba6b72687261b6 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -15,6 +15,7 @@ import ( protocol "github.com/libp2p/go-libp2p-protocol" identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" ping "github.com/libp2p/go-libp2p/p2p/protocol/ping" + punching "github.com/libp2p/go-libp2p/p2p/protocol/punching" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" msmux "github.com/multiformats/go-multistream" @@ -56,7 +57,7 @@ const NATPortMap Option = iota type BasicHost struct { network inet.Network mux *msmux.MultistreamMuxer - ids *identify.IDService + Ids *identify.IDService pings *ping.PingService natmgr NATManager maResolver *madns.Resolver @@ -126,10 +127,10 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, } if opts.IdentifyService != nil { - h.ids = opts.IdentifyService + h.Ids = opts.IdentifyService } else { // we can't set this as a default above because it depends on the *BasicHost. - h.ids = identify.NewIDService(h) + h.Ids = identify.NewIDService(h) } if uint64(opts.NegotiationTimeout) != 0 { @@ -159,6 +160,9 @@ func NewHost(ctx context.Context, net inet.Network, opts *HostOpts) (*BasicHost, h.pings = ping.NewPingService(h) } + // FIXME move out of basic_host + punching.NewPunchingService(h, h.Ids) + net.SetConnHandler(h.newConnHandler) net.SetStreamHandler(h.newStreamHandler) return h, nil @@ -206,7 +210,7 @@ func (h *BasicHost) newConnHandler(c inet.Conn) { // Clear protocols on connecting to new peer to avoid issues caused // by misremembering protocols between reconnects h.Peerstore().SetProtocols(c.RemotePeer()) - h.ids.IdentifyConn(c) + h.Ids.IdentifyConn(c) } // newStreamHandler is the remote-opened stream handler for inet.Network @@ -260,7 +264,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { // PushIdentify pushes an identify update through the identify push protocol // Warning: this interface is unstable and may disappear in the future. func (h *BasicHost) PushIdentify() { - h.ids.Push() + h.Ids.Push() } // ID returns the (local) peer.ID associated with this Host @@ -285,7 +289,7 @@ func (h *BasicHost) Mux() *msmux.MultistreamMuxer { // IDService returns func (h *BasicHost) IDService() *identify.IDService { - return h.ids + return h.Ids } // SetStreamHandler sets the protocol handler on the Host's Mux. @@ -459,7 +463,7 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error { // identify the connection before returning. done := make(chan struct{}) go func() { - h.ids.IdentifyConn(c) + h.Ids.IdentifyConn(c) close(done) }() @@ -508,9 +512,9 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr { log.Debug("error retrieving network interface addrs") } var observedAddrs []ma.Multiaddr - if h.ids != nil { + if h.Ids != nil { // peer observed addresses - observedAddrs = h.ids.OwnObservedAddrs() + observedAddrs = h.Ids.OwnObservedAddrs() } var natAddrs []ma.Multiaddr // natmgr is nil if we do not use nat option; diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index affdec7b3a90f117aa9e83ce77d22a3d2c891095..0b3f066a191e2f18168c4162ce19bc01b9f62557 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -215,13 +215,13 @@ func TestHostProtoPreknowledge(t *testing.T) { // wait for identify handshake to finish completely select { - case <-h1.ids.IdentifyWait(h1.Network().ConnsToPeer(h2.ID())[0]): + case <-h1.Ids.IdentifyWait(h1.Network().ConnsToPeer(h2.ID())[0]): case <-time.After(time.Second * 5): t.Fatal("timed out waiting for identify") } select { - case <-h2.ids.IdentifyWait(h2.Network().ConnsToPeer(h1.ID())[0]): + case <-h2.Ids.IdentifyWait(h2.Network().ConnsToPeer(h1.ID())[0]): case <-time.After(time.Second * 5): t.Fatal("timed out waiting for identify") } diff --git a/p2p/host/relay/autorelay.go b/p2p/host/relay/autorelay.go index e97ce883216ed6a9a92b51f8e0686cfb4d9f095b..3813ea0a6aae674cb1f955d0a18a654f562d2874 100644 --- a/p2p/host/relay/autorelay.go +++ b/p2p/host/relay/autorelay.go @@ -92,7 +92,9 @@ func (h *AutoRelayHost) background(ctx context.Context) { for { wait := autonat.AutoNATRefreshInterval - switch h.autonat.Status() { + status := h.autonat.Status() + h.Ids.SetNatStatus(status) + switch status { case autonat.NATStatusUnknown: wait = autonat.AutoNATRetryInterval case autonat.NATStatusPublic: diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 86bfada0682db59117538d8aaf82f1d965566c55..aadf7f8a274df6d0a0b8a38e6cf8c43ea38076db 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -2,19 +2,20 @@ package identify import ( "context" + "net" "sync" "time" - pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" - ggio "github.com/gogo/protobuf/io" logging "github.com/ipfs/go-log" + autonat "github.com/libp2p/go-libp2p-autonat" ic "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" lgbl "github.com/libp2p/go-libp2p-loggables" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" + pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" ma "github.com/multiformats/go-multiaddr" msmux "github.com/multiformats/go-multistream" ) @@ -54,6 +55,9 @@ type IDService struct { // our own observed addresses. // TODO: instead of expiring, remove these when we disconnect observedAddrs ObservedAddrSet + + // NAT status + natStatus pb.Identify_NATStatus } // NewIDService constructs a new *IDService and activates it by @@ -163,6 +167,21 @@ func (ids *IDService) Push() { } } +func (ids *IDService) SetNatStatus(status autonat.NATStatus) { + switch status { + case autonat.NATStatusPrivate: + ids.natStatus = pb.Identify_NATStatusPrivate + case autonat.NATStatusPublic: + ids.natStatus = pb.Identify_NATStatusPublic + default: + ids.natStatus = pb.Identify_NATStatusUnknown + } +} + +func (ids *IDService) GetNatStatus() pb.Identify_NATStatus { + return ids.natStatus +} + func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) { // set protocols this node is currently handling @@ -201,6 +220,9 @@ func (ids *IDService) populateMessage(mes *pb.Identify, c inet.Conn) { av := ClientVersion mes.ProtocolVersion = &pv mes.AgentVersion = &av + + // set if behind NAT when possible + mes.NatStatus = &ids.natStatus } func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { @@ -209,6 +231,8 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { // mes.Protocols ids.Host.Peerstore().SetProtocols(p, mes.Protocols...) + ids.Host.Peerstore().Put(p, "natStatus", mes.GetNatStatus()) + // mes.ObservedAddr ids.consumeObservedAddress(mes.GetObservedAddr(), c) diff --git a/p2p/protocol/identify/pb/identify.pb.go b/p2p/protocol/identify/pb/identify.pb.go index 774ab2b89c6374196dca466ff34b8ac462b0aa70..02813f56c3398af6e976d3cffa5467f15d634ad3 100644 --- a/p2p/protocol/identify/pb/identify.pb.go +++ b/p2p/protocol/identify/pb/identify.pb.go @@ -20,6 +20,45 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package +type Identify_NATStatus int32 + +const ( + Identify_NATStatusUnknown Identify_NATStatus = 0 + Identify_NATStatusPublic Identify_NATStatus = 1 + Identify_NATStatusPrivate Identify_NATStatus = 2 +) + +var Identify_NATStatus_name = map[int32]string{ + 0: "NATStatusUnknown", + 1: "NATStatusPublic", + 2: "NATStatusPrivate", +} +var Identify_NATStatus_value = map[string]int32{ + "NATStatusUnknown": 0, + "NATStatusPublic": 1, + "NATStatusPrivate": 2, +} + +func (x Identify_NATStatus) Enum() *Identify_NATStatus { + p := new(Identify_NATStatus) + *p = x + return p +} +func (x Identify_NATStatus) String() string { + return proto.EnumName(Identify_NATStatus_name, int32(x)) +} +func (x *Identify_NATStatus) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Identify_NATStatus_value, data, "Identify_NATStatus") + if err != nil { + return err + } + *x = Identify_NATStatus(value) + return nil +} +func (Identify_NATStatus) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_identify_abe757650f0e8336, []int{0, 0} +} + type Identify struct { // protocolVersion determines compatibility between peers ProtocolVersion *string `protobuf:"bytes,5,opt,name=protocolVersion" json:"protocolVersion,omitempty"` @@ -37,17 +76,18 @@ type Identify struct { // determine whether its connection to the local peer goes through NAT. ObservedAddr []byte `protobuf:"bytes,4,opt,name=observedAddr" json:"observedAddr,omitempty"` // protocols are the services this node is running - Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"` + NatStatus *Identify_NATStatus `protobuf:"varint,7,opt,name=natStatus,enum=identify.pb.Identify_NATStatus" json:"natStatus,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Identify) Reset() { *m = Identify{} } func (m *Identify) String() string { return proto.CompactTextString(m) } func (*Identify) ProtoMessage() {} func (*Identify) Descriptor() ([]byte, []int) { - return fileDescriptor_identify_daaec8baf46eae80, []int{0} + return fileDescriptor_identify_abe757650f0e8336, []int{0} } func (m *Identify) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -118,8 +158,16 @@ func (m *Identify) GetProtocols() []string { return nil } +func (m *Identify) GetNatStatus() Identify_NATStatus { + if m != nil && m.NatStatus != nil { + return *m.NatStatus + } + return Identify_NATStatusUnknown +} + func init() { proto.RegisterType((*Identify)(nil), "identify.pb.Identify") + proto.RegisterEnum("identify.pb.Identify_NATStatus", Identify_NATStatus_name, Identify_NATStatus_value) } func (m *Identify) Marshal() (dAtA []byte, err error) { size := m.Size() @@ -183,6 +231,11 @@ func (m *Identify) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintIdentify(dAtA, i, uint64(len(*m.AgentVersion))) i += copy(dAtA[i:], *m.AgentVersion) } + if m.NatStatus != nil { + dAtA[i] = 0x38 + i++ + i = encodeVarintIdentify(dAtA, i, uint64(*m.NatStatus)) + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -199,6 +252,9 @@ func encodeVarintIdentify(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *Identify) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.PublicKey != nil { @@ -229,6 +285,9 @@ func (m *Identify) Size() (n int) { l = len(*m.AgentVersion) n += 1 + l + sovIdentify(uint64(l)) } + if m.NatStatus != nil { + n += 1 + sovIdentify(uint64(*m.NatStatus)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -457,6 +516,26 @@ func (m *Identify) Unmarshal(dAtA []byte) error { s := string(dAtA[iNdEx:postIndex]) m.AgentVersion = &s iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NatStatus", wireType) + } + var v Identify_NATStatus + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowIdentify + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (Identify_NATStatus(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.NatStatus = &v default: iNdEx = preIndex skippy, err := skipIdentify(dAtA[iNdEx:]) @@ -584,20 +663,25 @@ var ( ErrIntOverflowIdentify = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("identify.proto", fileDescriptor_identify_daaec8baf46eae80) } +func init() { proto.RegisterFile("identify.proto", fileDescriptor_identify_abe757650f0e8336) } -var fileDescriptor_identify_daaec8baf46eae80 = []byte{ - // 187 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4c, 0x49, 0xcd, - 0x2b, 0xc9, 0x4c, 0xab, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0xf0, 0x93, 0x94, - 0x6e, 0x31, 0x72, 0x71, 0x78, 0x42, 0xf9, 0x42, 0x32, 0x5c, 0x9c, 0x05, 0xa5, 0x49, 0x39, 0x99, - 0xc9, 0xde, 0xa9, 0x95, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x08, 0x01, 0x21, 0x05, 0x2e, - 0xee, 0x9c, 0xcc, 0xe2, 0x92, 0xd4, 0x3c, 0xc7, 0x94, 0x94, 0xa2, 0x62, 0x09, 0x26, 0x05, 0x66, - 0x0d, 0x9e, 0x20, 0x64, 0x21, 0xb0, 0x7e, 0x90, 0x15, 0xc9, 0xf9, 0x39, 0xc5, 0x12, 0xcc, 0x0a, - 0xcc, 0x1a, 0x9c, 0x41, 0x08, 0x01, 0x21, 0x25, 0x2e, 0x9e, 0xfc, 0xa4, 0xe2, 0xd4, 0xa2, 0xb2, - 0xd4, 0x14, 0x90, 0x72, 0x09, 0x16, 0xb0, 0x05, 0x28, 0x62, 0x42, 0x1a, 0x5c, 0xfc, 0x30, 0x0d, - 0x61, 0xa9, 0x45, 0xc5, 0x99, 0xf9, 0x79, 0x12, 0xac, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0xe8, 0xc2, - 0x20, 0xd3, 0x12, 0xd3, 0x53, 0xf3, 0x4a, 0x60, 0xca, 0xd8, 0xc0, 0xca, 0x50, 0xc4, 0x9c, 0x78, - 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0x46, 0x40, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x4b, 0x9c, 0x90, 0x7a, 0x08, 0x01, 0x00, 0x00, +var fileDescriptor_identify_abe757650f0e8336 = []byte{ + // 262 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x90, 0xbb, 0x4e, 0xc3, 0x30, + 0x18, 0x85, 0x71, 0xc2, 0xcd, 0x7f, 0xa3, 0x36, 0x32, 0x0c, 0x1e, 0x50, 0xb0, 0x32, 0x79, 0xca, + 0xc0, 0xce, 0x50, 0x36, 0x04, 0x42, 0x95, 0xb9, 0xec, 0x49, 0x63, 0x90, 0x45, 0x64, 0x57, 0xb6, + 0x5b, 0xd4, 0x37, 0x64, 0xe4, 0x11, 0x50, 0x16, 0x5e, 0x03, 0x25, 0x90, 0xa4, 0xe9, 0x78, 0x3e, + 0x7d, 0xc7, 0xbf, 0x8e, 0x61, 0xaa, 0x4a, 0xa9, 0xbd, 0x7a, 0xdd, 0x66, 0x2b, 0x6b, 0xbc, 0x21, + 0x93, 0x21, 0x17, 0xe9, 0x4f, 0x00, 0xa7, 0xb7, 0xff, 0x99, 0x5c, 0x00, 0x5e, 0xad, 0x8b, 0x4a, + 0x2d, 0xef, 0xe4, 0x96, 0x22, 0x86, 0x78, 0x24, 0x06, 0x40, 0x18, 0x4c, 0x2a, 0xe5, 0xbc, 0xd4, + 0xf3, 0xb2, 0xb4, 0x8e, 0x06, 0x2c, 0xe4, 0x91, 0xd8, 0x45, 0x6d, 0xbf, 0x39, 0xb1, 0x34, 0x95, + 0xa3, 0x21, 0x0b, 0x39, 0x16, 0x03, 0x20, 0x29, 0x44, 0xa6, 0x70, 0xd2, 0x6e, 0x64, 0xd9, 0xe8, + 0xf4, 0xb0, 0x3d, 0x30, 0x62, 0x84, 0xc3, 0xac, 0x2b, 0xbc, 0x48, 0xeb, 0x94, 0xd1, 0xf4, 0x88, + 0x21, 0x8e, 0xc5, 0x3e, 0x6e, 0x5e, 0xcb, 0xdf, 0xa4, 0xf6, 0x9d, 0x76, 0xdc, 0x6a, 0x23, 0x46, + 0xae, 0x01, 0xeb, 0xdc, 0x3f, 0xfa, 0xdc, 0xaf, 0x1d, 0x3d, 0x61, 0x88, 0x4f, 0xaf, 0x2e, 0xb3, + 0x9d, 0xf5, 0x59, 0xb7, 0x3c, 0x7b, 0x98, 0x3f, 0xfd, 0x69, 0x62, 0x68, 0xa4, 0xf7, 0x80, 0x7b, + 0x4e, 0xce, 0x21, 0xee, 0xc3, 0xb3, 0x7e, 0xd7, 0xe6, 0x43, 0xc7, 0x07, 0xe4, 0x0c, 0x66, 0x3d, + 0x5d, 0xb4, 0x3f, 0x15, 0xa3, 0x91, 0xba, 0xb0, 0x6a, 0x93, 0x7b, 0x19, 0x07, 0x37, 0xd1, 0x67, + 0x9d, 0xa0, 0xaf, 0x3a, 0x41, 0xdf, 0x75, 0x82, 0x7e, 0x03, 0x00, 0x00, 0xff, 0xff, 0xca, 0x4d, + 0x42, 0xce, 0x95, 0x01, 0x00, 0x00, } diff --git a/p2p/protocol/identify/pb/identify.proto b/p2p/protocol/identify/pb/identify.proto index 5270c4cf5813c21e4dad0fff2b535de952449c61..45482f4c5da23ef056361837c8d7ecef15c52d61 100644 --- a/p2p/protocol/identify/pb/identify.proto +++ b/p2p/protocol/identify/pb/identify.proto @@ -26,4 +26,12 @@ message Identify { // protocols are the services this node is running repeated string protocols = 3; + + enum NATStatus { + NATStatusUnknown = 0; + NATStatusPublic = 1; + NATStatusPrivate = 2; + }; + + optional NATStatus natStatus = 7; } diff --git a/p2p/protocol/punching/pb/punching.pb.go b/p2p/protocol/punching/pb/punching.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..c06fd344b89f24915718bcec34de0c9d73564c88 --- /dev/null +++ b/p2p/protocol/punching/pb/punching.pb.go @@ -0,0 +1,345 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: punching.proto + +package punching_pb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Punching struct { + // addresses for peer to test connectivity + Addresses []string `protobuf:"bytes,1,rep,name=addresses" json:"addresses,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Punching) Reset() { *m = Punching{} } +func (m *Punching) String() string { return proto.CompactTextString(m) } +func (*Punching) ProtoMessage() {} +func (*Punching) Descriptor() ([]byte, []int) { + return fileDescriptor_punching_ccf7b972b2451d9b, []int{0} +} +func (m *Punching) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Punching) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Punching.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *Punching) XXX_Merge(src proto.Message) { + xxx_messageInfo_Punching.Merge(dst, src) +} +func (m *Punching) XXX_Size() int { + return m.Size() +} +func (m *Punching) XXX_DiscardUnknown() { + xxx_messageInfo_Punching.DiscardUnknown(m) +} + +var xxx_messageInfo_Punching proto.InternalMessageInfo + +func (m *Punching) GetAddresses() []string { + if m != nil { + return m.Addresses + } + return nil +} + +func init() { + proto.RegisterType((*Punching)(nil), "punching.pb.Punching") +} +func (m *Punching) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Punching) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Addresses) > 0 { + for _, s := range m.Addresses { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeVarintPunching(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Punching) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Addresses) > 0 { + for _, s := range m.Addresses { + l = len(s) + n += 1 + l + sovPunching(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovPunching(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozPunching(x uint64) (n int) { + return sovPunching(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Punching) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPunching + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Punching: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Punching: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addresses", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPunching + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPunching + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addresses = append(m.Addresses, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipPunching(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthPunching + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipPunching(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPunching + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPunching + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPunching + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthPunching + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPunching + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipPunching(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthPunching = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowPunching = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("punching.proto", fileDescriptor_punching_ccf7b972b2451d9b) } + +var fileDescriptor_punching_ccf7b972b2451d9b = []byte{ + // 88 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x28, 0xcd, 0x4b, + 0xce, 0xc8, 0xcc, 0x4b, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0xf0, 0x93, 0x94, + 0x34, 0xb8, 0x38, 0x02, 0xa0, 0x5c, 0x21, 0x19, 0x2e, 0xce, 0xc4, 0x94, 0x94, 0xa2, 0xd4, 0xe2, + 0xe2, 0xd4, 0x62, 0x09, 0x46, 0x05, 0x66, 0x0d, 0xce, 0x20, 0x84, 0x80, 0x13, 0xcf, 0x89, 0x47, + 0x72, 0x8c, 0x17, 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0x08, 0x08, 0x00, 0x00, 0xff, 0xff, + 0x41, 0xc4, 0x5a, 0xaf, 0x55, 0x00, 0x00, 0x00, +} diff --git a/p2p/protocol/punching/pb/punching.proto b/p2p/protocol/punching/pb/punching.proto new file mode 100644 index 0000000000000000000000000000000000000000..de920209bcf2cdc3788ff9136a15b94a81856337 --- /dev/null +++ b/p2p/protocol/punching/pb/punching.proto @@ -0,0 +1,8 @@ +syntax = "proto2"; + +package punching.pb; + +message Punching { + // addresses for peer to test connectivity + repeated string addresses = 1; +} \ No newline at end of file diff --git a/p2p/protocol/punching/punching.go b/p2p/protocol/punching/punching.go new file mode 100644 index 0000000000000000000000000000000000000000..b5aebc876ccdc7d80c6a27dd62778a9620d311de --- /dev/null +++ b/p2p/protocol/punching/punching.go @@ -0,0 +1,178 @@ +package punching + +import ( + "context" + "strings" + "time" + + ggio "github.com/gogo/protobuf/io" + logging "github.com/ipfs/go-log" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" + swarm "github.com/libp2p/go-libp2p-swarm" + identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" + identify_pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" + punching_pb "github.com/libp2p/go-libp2p/p2p/protocol/punching/pb" + ma "github.com/multiformats/go-multiaddr" +) + +var log = logging.Logger("punching") + +const PunchingProtocol = "/libp2p/punching/1.0.0" + +type PunchingService struct { + host host.Host + idService *identify.IDService +} + +func NewPunchingService(h host.Host, idService *identify.IDService) *PunchingService { + punchingService := &PunchingService{ + host: h, + idService: idService, + } + h.SetStreamHandler(PunchingProtocol, punchingService.PunchingHandler) + h.Network().Notify(punchingService) + return punchingService +} + +func (p *PunchingService) PunchingHandler(s inet.Stream) { + log.Debug("Incoming Punching Request", s.Protocol(), s.Conn().RemoteMultiaddr().String()) + + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + var incoming punching_pb.Punching + err := r.ReadMsg(&incoming) + if err != nil { + log.Debug("err read punching_pb.Punching:", err) + return + } + log.Debug("Incoming peer addresses:", incoming.Addresses) + if len(incoming.Addresses) == 0 { + return + } + + go func() { + maddrs := make([]ma.Multiaddr, 0, len(incoming.Addresses)) + for _, a := range incoming.Addresses { + maddr, err := ma.NewMultiaddr(a) + if err != nil { + log.Debug("Cannot convert", a, "to multiaddress:", err) + continue + } + maddrs = append(maddrs, maddr) + } + if len(maddrs) == 0 { + return + } + peerInfo := pstore.PeerInfo{ + ID: s.Conn().RemotePeer(), + Addrs: maddrs, + } + + timeCount := 0 + for timeCount < 5 { + timeCount += 1 + + // copied from go-ipfs/core/coreapi/swarm + if swrm, ok := p.host.Network().(*swarm.Swarm); ok { + swrm.Backoff().Clear(peerInfo.ID) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + err := p.host.Connect(ctx, peerInfo) + cancel() + if err != nil { + log.Info("Failed to connect", peerInfo.ID, "err:", err) + } else { + log.Info("Punch-connect to", incoming.Addresses, "successful") + return + } + time.Sleep(10 * time.Second) + } + }() +} + +func (p *PunchingService) punch(peerID peer.ID) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + stream, err := p.host.NewStream(ctx, peerID, PunchingProtocol) + cancel() + if err != nil { + log.Debug("Failed to create new stream to", peerID) + return + } + allAddresses := p.host.Addrs() + quicAddress := make([]string, 0, len(allAddresses)) + for _, a := range allAddresses { + s := a.String() + if strings.Contains(s, "/quic") && + !strings.Contains(s, "/ip6") && + !strings.Contains(s, "/127.0.0.1") && + !strings.Contains(s, "/::") && + !strings.Contains(s, "/192.168.") && + !strings.Contains(s, "/10.") && + !strings.Contains(s, "/172.") { + quicAddress = append(quicAddress, s) + } + } + if len(quicAddress) == 0 { + return + } + log.Debug("QUIC addresses:", quicAddress) + msg := punching_pb.Punching{ + Addresses: quicAddress, + } + + w := ggio.NewDelimitedWriter(stream) + err = w.WriteMsg(&msg) + if err != nil { + log.Debug("err write punching_pb.Punching:", err) + return + } +} + +// +// Implement inet.Notifiee +// +func (p *PunchingService) Listen(inet.Network, ma.Multiaddr) {} // called when network starts listening on an addr +func (p *PunchingService) ListenClose(inet.Network, ma.Multiaddr) {} // called when network starts listening on an addr +func (p *PunchingService) Connected(net inet.Network, conn inet.Conn) { // called when a connection opened + go func() { + time.Sleep(5 * time.Second) + log.Debug("Connected", conn.LocalMultiaddr().String(), "<==>", conn.RemoteMultiaddr().String()) + + peerID := conn.RemotePeer() + protos, err := p.host.Peerstore().SupportsProtocols(peerID, PunchingProtocol) + if err != nil { + log.Debug("error retrieving supported protocols for peer %s: %s\n", peerID, err) + return + } + + if len(protos) > 0 { + selfNatStatus := p.idService.GetNatStatus() + var peerNatStatus identify_pb.Identify_NATStatus + peerNatStatusRaw, err := p.host.Peerstore().Get(peerID, "natStatus") + if err == nil { + peerNatStatus = peerNatStatusRaw.(identify_pb.Identify_NATStatus) + } + log.Info("Discovered punching-support peer", peerID.String(), + "self NAT status", selfNatStatus, + "peer NAT status", peerNatStatus) + if selfNatStatus == identify_pb.Identify_NATStatusPublic || + peerNatStatus == identify_pb.Identify_NATStatusPublic { + return + } + + p.punch(peerID) + } + }() +} +func (p *PunchingService) Disconnected(inet.Network, inet.Conn) {} // called when a connection closed +func (p *PunchingService) OpenedStream(net inet.Network, stream inet.Stream) { // called when a stream opened + go func() { + time.Sleep(5 * time.Second) + log.Debug("Connected", + stream.Conn().LocalMultiaddr().String(), "<==>", stream.Conn().RemoteMultiaddr().String(), + "protocol", stream.Protocol()) + }() +} +func (p *PunchingService) ClosedStream(inet.Network, inet.Stream) {} // called when a stream closed