diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index b2bf0e341f2cddd5e0a0951c47e227186f1e3981..06248360e822f31647140bb0c1bce771837ffd2f 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -104,6 +104,9 @@ func New(net inet.Network, opts ...interface{}) *BasicHost { // newConnHandler is the remote-opened conn handler for inet.Network func (h *BasicHost) newConnHandler(c inet.Conn) { + // Clear protocols on connecting to new peer to avoid issues caused + // by misremembering protocols between reconnects + h.Peerstore().SetProtocols(c.RemotePeer()) h.ids.IdentifyConn(c) } @@ -120,7 +123,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { } } - protoID, handle, err := h.Mux().Negotiate(s) + lzc, protoID, handle, err := h.Mux().NegotiateLazy(s) took := time.Now().Sub(before) if err != nil { if err == io.EOF { @@ -136,6 +139,11 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { return } + s = &streamWrapper{ + Stream: s, + rw: lzc, + } + if h.NegotiateTimeout != 0 { if err := s.SetDeadline(time.Time{}); err != nil { log.Error("resetting stream deadline: ", err) @@ -316,6 +324,10 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error { return err } + // Clear protocols on connecting to new peer to avoid issues caused + // by misremembering protocols between reconnects + h.Peerstore().SetProtocols(p) + // identify the connection before returning. done := make(chan struct{}) go func() { diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index caf2d8f7bcd0b12442fc70e2ed34ac1174d30afa..57d7c2edd762477014a5eb5a3816b0bad9d29878 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -98,7 +98,7 @@ func TestHostProtoPreference(t *testing.T) { protoNew := protocol.ID("/testing/1.1.0") protoMinor := protocol.ID("/testing/1.2.0") - connectedOn := make(chan protocol.ID, 16) + connectedOn := make(chan protocol.ID) handler := func(s inet.Stream) { connectedOn <- s.Protocol() @@ -173,7 +173,7 @@ func TestHostProtoPreknowledge(t *testing.T) { h1 := New(testutil.GenSwarmNetwork(t, ctx)) h2 := New(testutil.GenSwarmNetwork(t, ctx)) - conn := make(chan protocol.ID, 16) + conn := make(chan protocol.ID) handler := func(s inet.Stream) { conn <- s.Protocol() s.Close() @@ -189,7 +189,17 @@ func TestHostProtoPreknowledge(t *testing.T) { defer h2.Close() // wait for identify handshake to finish completely - time.Sleep(time.Millisecond * 20) + select { + case <-h1.ids.IdentifyWait(h1.Network().ConnsToPeer(h2.ID())[0]): + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for identify") + } + + select { + case <-h2.ids.IdentifyWait(h2.Network().ConnsToPeer(h1.ID())[0]): + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for identify") + } h1.SetStreamHandler("/foo", handler) @@ -222,7 +232,7 @@ func TestNewDialOld(t *testing.T) { defer h1.Close() defer h2.Close() - connectedOn := make(chan protocol.ID, 16) + connectedOn := make(chan protocol.ID) h1.SetStreamHandler("/testing", func(s inet.Stream) { connectedOn <- s.Protocol() s.Close() @@ -250,7 +260,7 @@ func TestProtoDowngrade(t *testing.T) { defer h1.Close() defer h2.Close() - connectedOn := make(chan protocol.ID, 16) + connectedOn := make(chan protocol.ID) h1.SetStreamHandler("/testing/1.0.0", func(s inet.Stream) { connectedOn <- s.Protocol() s.Close() diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 867664bbf0a9c319b1c86e0714a5bee396f36963..325ce8a7faf82940f0538009763a7cbac6d46436 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -100,8 +100,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) { // ok give the response to our handler. if err := msmux.SelectProtoOrFail(ID, s); err != nil { - log.Debugf("error writing stream header for %s", ID) - log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer()) + log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err}) return } @@ -113,7 +112,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) { ids.currmu.Unlock() if !found { - log.Debugf("IdentifyConn failed to find channel (programmer error) for %s", c) + log.Errorf("IdentifyConn failed to find channel (programmer error) for %s", c) return } } @@ -142,6 +141,7 @@ func (ids *IDService) ResponseHandler(s inet.Stream) { r := ggio.NewDelimitedReader(s, 2048) mes := pb.Identify{} if err := r.ReadMsg(&mes); err != nil { + log.Warning("error reading identify message: ", err) return } ids.consumeMessage(&mes, c) diff --git a/package.json b/package.json index b0013e5218de0a7a054bb9550869179cbc2a444f..56dd8efbeb4f5c2628e2da5fc51705484ce1644d 100644 --- a/package.json +++ b/package.json @@ -34,9 +34,9 @@ "version": "1.0.0" }, { - "hash": "QmatJnBK2qyjcy1AYq4Gb5YH16YM7uibdteQ589r46YLvB", + "hash": "QmcpkzwqeqEnMgrmR4E48Ex52fcQU7eJz11gXk3qEit95q", "name": "go-multistream", - "version": "0.3.1" + "version": "0.3.5" }, { "hash": "QmQHGMVmrsgmqUG8ih3puNXUJneSpi13dkcZpzLKkskUkH", @@ -187,9 +187,9 @@ }, { "author": "whyrusleeping", - "hash": "QmQKfJb2rWd7vCzf38VEx9NQWUkuqfd5KuGo6eZEtQLNcQ", + "hash": "QmdEajEhjdowKAsrZiaZBWnZkdREjT32UZAsfhAfcmonr3", "name": "go-libp2p-conn", - "version": "1.5.1" + "version": "1.5.4" }, { "author": "whyrusleeping", @@ -211,15 +211,15 @@ }, { "author": "whyrusleeping", - "hash": "QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ", + "hash": "Qmb55YjV8z4PNChVKZM5rehSFLtgDURmDZ8d3DdkvBBZR1", "name": "go-libp2p-host", - "version": "1.3.1" + "version": "1.3.4" }, { "author": "whyrusleeping", - "hash": "QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4", + "hash": "QmfSEkiXoCxTkRajpiHiyruDGSGpX9n6NivdFQJWBGVCQM", "name": "go-libp2p-swarm", - "version": "1.6.1" + "version": "1.6.4" }, { "author": "whyrusleeping", @@ -229,15 +229,15 @@ }, { "author": "whyrusleeping", - "hash": "QmWdGJY4fcsfhLHucEfivw8J71yUqNUFbzdU1jnJBnN5Xh", + "hash": "QmUcaGGriHNMtx7udfjUFQ7yDVq8zopYRDey5Lj68z5DZQ", "name": "go-libp2p-netutil", - "version": "0.2.1" + "version": "0.2.4" }, { "author": "whyrusleeping", - "hash": "QmSzhYTPRvh5nUJnRfYBW52QGX6jekULCRQcrxRs8hmzj4", + "hash": "QmRg8JoySAwWunyjZwwo8fbm5ysw3vv3ggSHcmw8KXzk6f", "name": "go-libp2p-blankhost", - "version": "0.1.1" + "version": "0.1.4" } ], "gxVersion": "0.4.0",