From 104c97ed5750b9c82612ee342a1cf2a3ccfbdf42 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 4 Jan 2017 12:24:15 -0800 Subject: [PATCH] update go-multistream and use negotiateLazy to allow for readonly streams --- p2p/host/basic/basic_host.go | 14 +++++++++++++- p2p/host/basic/basic_host_test.go | 20 +++++++++++++++----- p2p/protocol/identify/id.go | 6 +++--- package.json | 24 ++++++++++++------------ 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index b2bf0e3..0624836 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -104,6 +104,9 @@ func New(net inet.Network, opts ...interface{}) *BasicHost { // newConnHandler is the remote-opened conn handler for inet.Network func (h *BasicHost) newConnHandler(c inet.Conn) { + // Clear protocols on connecting to new peer to avoid issues caused + // by misremembering protocols between reconnects + h.Peerstore().SetProtocols(c.RemotePeer()) h.ids.IdentifyConn(c) } @@ -120,7 +123,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { } } - protoID, handle, err := h.Mux().Negotiate(s) + lzc, protoID, handle, err := h.Mux().NegotiateLazy(s) took := time.Now().Sub(before) if err != nil { if err == io.EOF { @@ -136,6 +139,11 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { return } + s = &streamWrapper{ + Stream: s, + rw: lzc, + } + if h.NegotiateTimeout != 0 { if err := s.SetDeadline(time.Time{}); err != nil { log.Error("resetting stream deadline: ", err) @@ -316,6 +324,10 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error { return err } + // Clear protocols on connecting to new peer to avoid issues caused + // by misremembering protocols between reconnects + h.Peerstore().SetProtocols(p) + // identify the connection before returning. done := make(chan struct{}) go func() { diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index caf2d8f..57d7c2e 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -98,7 +98,7 @@ func TestHostProtoPreference(t *testing.T) { protoNew := protocol.ID("/testing/1.1.0") protoMinor := protocol.ID("/testing/1.2.0") - connectedOn := make(chan protocol.ID, 16) + connectedOn := make(chan protocol.ID) handler := func(s inet.Stream) { connectedOn <- s.Protocol() @@ -173,7 +173,7 @@ func TestHostProtoPreknowledge(t *testing.T) { h1 := New(testutil.GenSwarmNetwork(t, ctx)) h2 := New(testutil.GenSwarmNetwork(t, ctx)) - conn := make(chan protocol.ID, 16) + conn := make(chan protocol.ID) handler := func(s inet.Stream) { conn <- s.Protocol() s.Close() @@ -189,7 +189,17 @@ func TestHostProtoPreknowledge(t *testing.T) { defer h2.Close() // wait for identify handshake to finish completely - time.Sleep(time.Millisecond * 20) + select { + case <-h1.ids.IdentifyWait(h1.Network().ConnsToPeer(h2.ID())[0]): + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for identify") + } + + select { + case <-h2.ids.IdentifyWait(h2.Network().ConnsToPeer(h1.ID())[0]): + case <-time.After(time.Second * 5): + t.Fatal("timed out waiting for identify") + } h1.SetStreamHandler("/foo", handler) @@ -222,7 +232,7 @@ func TestNewDialOld(t *testing.T) { defer h1.Close() defer h2.Close() - connectedOn := make(chan protocol.ID, 16) + connectedOn := make(chan protocol.ID) h1.SetStreamHandler("/testing", func(s inet.Stream) { connectedOn <- s.Protocol() s.Close() @@ -250,7 +260,7 @@ func TestProtoDowngrade(t *testing.T) { defer h1.Close() defer h2.Close() - connectedOn := make(chan protocol.ID, 16) + connectedOn := make(chan protocol.ID) h1.SetStreamHandler("/testing/1.0.0", func(s inet.Stream) { connectedOn <- s.Protocol() s.Close() diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 867664b..325ce8a 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -100,8 +100,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) { // ok give the response to our handler. if err := msmux.SelectProtoOrFail(ID, s); err != nil { - log.Debugf("error writing stream header for %s", ID) - log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer()) + log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err}) return } @@ -113,7 +112,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) { ids.currmu.Unlock() if !found { - log.Debugf("IdentifyConn failed to find channel (programmer error) for %s", c) + log.Errorf("IdentifyConn failed to find channel (programmer error) for %s", c) return } } @@ -142,6 +141,7 @@ func (ids *IDService) ResponseHandler(s inet.Stream) { r := ggio.NewDelimitedReader(s, 2048) mes := pb.Identify{} if err := r.ReadMsg(&mes); err != nil { + log.Warning("error reading identify message: ", err) return } ids.consumeMessage(&mes, c) diff --git a/package.json b/package.json index b0013e5..56dd8ef 100644 --- a/package.json +++ b/package.json @@ -34,9 +34,9 @@ "version": "1.0.0" }, { - "hash": "QmatJnBK2qyjcy1AYq4Gb5YH16YM7uibdteQ589r46YLvB", + "hash": "QmcpkzwqeqEnMgrmR4E48Ex52fcQU7eJz11gXk3qEit95q", "name": "go-multistream", - "version": "0.3.1" + "version": "0.3.5" }, { "hash": "QmQHGMVmrsgmqUG8ih3puNXUJneSpi13dkcZpzLKkskUkH", @@ -187,9 +187,9 @@ }, { "author": "whyrusleeping", - "hash": "QmQKfJb2rWd7vCzf38VEx9NQWUkuqfd5KuGo6eZEtQLNcQ", + "hash": "QmdEajEhjdowKAsrZiaZBWnZkdREjT32UZAsfhAfcmonr3", "name": "go-libp2p-conn", - "version": "1.5.1" + "version": "1.5.4" }, { "author": "whyrusleeping", @@ -211,15 +211,15 @@ }, { "author": "whyrusleeping", - "hash": "QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ", + "hash": "Qmb55YjV8z4PNChVKZM5rehSFLtgDURmDZ8d3DdkvBBZR1", "name": "go-libp2p-host", - "version": "1.3.1" + "version": "1.3.4" }, { "author": "whyrusleeping", - "hash": "QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4", + "hash": "QmfSEkiXoCxTkRajpiHiyruDGSGpX9n6NivdFQJWBGVCQM", "name": "go-libp2p-swarm", - "version": "1.6.1" + "version": "1.6.4" }, { "author": "whyrusleeping", @@ -229,15 +229,15 @@ }, { "author": "whyrusleeping", - "hash": "QmWdGJY4fcsfhLHucEfivw8J71yUqNUFbzdU1jnJBnN5Xh", + "hash": "QmUcaGGriHNMtx7udfjUFQ7yDVq8zopYRDey5Lj68z5DZQ", "name": "go-libp2p-netutil", - "version": "0.2.1" + "version": "0.2.4" }, { "author": "whyrusleeping", - "hash": "QmSzhYTPRvh5nUJnRfYBW52QGX6jekULCRQcrxRs8hmzj4", + "hash": "QmRg8JoySAwWunyjZwwo8fbm5ysw3vv3ggSHcmw8KXzk6f", "name": "go-libp2p-blankhost", - "version": "0.1.1" + "version": "0.1.4" } ], "gxVersion": "0.4.0", -- GitLab