Commit 104c97ed authored by Jeromy's avatar Jeromy
Browse files

update go-multistream and use negotiateLazy to allow for readonly streams

parent c0f09cbe
...@@ -104,6 +104,9 @@ func New(net inet.Network, opts ...interface{}) *BasicHost { ...@@ -104,6 +104,9 @@ func New(net inet.Network, opts ...interface{}) *BasicHost {
// newConnHandler is the remote-opened conn handler for inet.Network // newConnHandler is the remote-opened conn handler for inet.Network
func (h *BasicHost) newConnHandler(c inet.Conn) { func (h *BasicHost) newConnHandler(c inet.Conn) {
// Clear protocols on connecting to new peer to avoid issues caused
// by misremembering protocols between reconnects
h.Peerstore().SetProtocols(c.RemotePeer())
h.ids.IdentifyConn(c) h.ids.IdentifyConn(c)
} }
...@@ -120,7 +123,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { ...@@ -120,7 +123,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
} }
} }
protoID, handle, err := h.Mux().Negotiate(s) lzc, protoID, handle, err := h.Mux().NegotiateLazy(s)
took := time.Now().Sub(before) took := time.Now().Sub(before)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
...@@ -136,6 +139,11 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { ...@@ -136,6 +139,11 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
return return
} }
s = &streamWrapper{
Stream: s,
rw: lzc,
}
if h.NegotiateTimeout != 0 { if h.NegotiateTimeout != 0 {
if err := s.SetDeadline(time.Time{}); err != nil { if err := s.SetDeadline(time.Time{}); err != nil {
log.Error("resetting stream deadline: ", err) log.Error("resetting stream deadline: ", err)
...@@ -316,6 +324,10 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error { ...@@ -316,6 +324,10 @@ func (h *BasicHost) dialPeer(ctx context.Context, p peer.ID) error {
return err return err
} }
// Clear protocols on connecting to new peer to avoid issues caused
// by misremembering protocols between reconnects
h.Peerstore().SetProtocols(p)
// identify the connection before returning. // identify the connection before returning.
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
......
...@@ -98,7 +98,7 @@ func TestHostProtoPreference(t *testing.T) { ...@@ -98,7 +98,7 @@ func TestHostProtoPreference(t *testing.T) {
protoNew := protocol.ID("/testing/1.1.0") protoNew := protocol.ID("/testing/1.1.0")
protoMinor := protocol.ID("/testing/1.2.0") protoMinor := protocol.ID("/testing/1.2.0")
connectedOn := make(chan protocol.ID, 16) connectedOn := make(chan protocol.ID)
handler := func(s inet.Stream) { handler := func(s inet.Stream) {
connectedOn <- s.Protocol() connectedOn <- s.Protocol()
...@@ -173,7 +173,7 @@ func TestHostProtoPreknowledge(t *testing.T) { ...@@ -173,7 +173,7 @@ func TestHostProtoPreknowledge(t *testing.T) {
h1 := New(testutil.GenSwarmNetwork(t, ctx)) h1 := New(testutil.GenSwarmNetwork(t, ctx))
h2 := New(testutil.GenSwarmNetwork(t, ctx)) h2 := New(testutil.GenSwarmNetwork(t, ctx))
conn := make(chan protocol.ID, 16) conn := make(chan protocol.ID)
handler := func(s inet.Stream) { handler := func(s inet.Stream) {
conn <- s.Protocol() conn <- s.Protocol()
s.Close() s.Close()
...@@ -189,7 +189,17 @@ func TestHostProtoPreknowledge(t *testing.T) { ...@@ -189,7 +189,17 @@ func TestHostProtoPreknowledge(t *testing.T) {
defer h2.Close() defer h2.Close()
// wait for identify handshake to finish completely // wait for identify handshake to finish completely
time.Sleep(time.Millisecond * 20) select {
case <-h1.ids.IdentifyWait(h1.Network().ConnsToPeer(h2.ID())[0]):
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for identify")
}
select {
case <-h2.ids.IdentifyWait(h2.Network().ConnsToPeer(h1.ID())[0]):
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for identify")
}
h1.SetStreamHandler("/foo", handler) h1.SetStreamHandler("/foo", handler)
...@@ -222,7 +232,7 @@ func TestNewDialOld(t *testing.T) { ...@@ -222,7 +232,7 @@ func TestNewDialOld(t *testing.T) {
defer h1.Close() defer h1.Close()
defer h2.Close() defer h2.Close()
connectedOn := make(chan protocol.ID, 16) connectedOn := make(chan protocol.ID)
h1.SetStreamHandler("/testing", func(s inet.Stream) { h1.SetStreamHandler("/testing", func(s inet.Stream) {
connectedOn <- s.Protocol() connectedOn <- s.Protocol()
s.Close() s.Close()
...@@ -250,7 +260,7 @@ func TestProtoDowngrade(t *testing.T) { ...@@ -250,7 +260,7 @@ func TestProtoDowngrade(t *testing.T) {
defer h1.Close() defer h1.Close()
defer h2.Close() defer h2.Close()
connectedOn := make(chan protocol.ID, 16) connectedOn := make(chan protocol.ID)
h1.SetStreamHandler("/testing/1.0.0", func(s inet.Stream) { h1.SetStreamHandler("/testing/1.0.0", func(s inet.Stream) {
connectedOn <- s.Protocol() connectedOn <- s.Protocol()
s.Close() s.Close()
......
...@@ -100,8 +100,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) { ...@@ -100,8 +100,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) {
// ok give the response to our handler. // ok give the response to our handler.
if err := msmux.SelectProtoOrFail(ID, s); err != nil { if err := msmux.SelectProtoOrFail(ID, s); err != nil {
log.Debugf("error writing stream header for %s", ID) log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err})
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer())
return return
} }
...@@ -113,7 +112,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) { ...@@ -113,7 +112,7 @@ func (ids *IDService) IdentifyConn(c inet.Conn) {
ids.currmu.Unlock() ids.currmu.Unlock()
if !found { if !found {
log.Debugf("IdentifyConn failed to find channel (programmer error) for %s", c) log.Errorf("IdentifyConn failed to find channel (programmer error) for %s", c)
return return
} }
} }
...@@ -142,6 +141,7 @@ func (ids *IDService) ResponseHandler(s inet.Stream) { ...@@ -142,6 +141,7 @@ func (ids *IDService) ResponseHandler(s inet.Stream) {
r := ggio.NewDelimitedReader(s, 2048) r := ggio.NewDelimitedReader(s, 2048)
mes := pb.Identify{} mes := pb.Identify{}
if err := r.ReadMsg(&mes); err != nil { if err := r.ReadMsg(&mes); err != nil {
log.Warning("error reading identify message: ", err)
return return
} }
ids.consumeMessage(&mes, c) ids.consumeMessage(&mes, c)
......
...@@ -34,9 +34,9 @@ ...@@ -34,9 +34,9 @@
"version": "1.0.0" "version": "1.0.0"
}, },
{ {
"hash": "QmatJnBK2qyjcy1AYq4Gb5YH16YM7uibdteQ589r46YLvB", "hash": "QmcpkzwqeqEnMgrmR4E48Ex52fcQU7eJz11gXk3qEit95q",
"name": "go-multistream", "name": "go-multistream",
"version": "0.3.1" "version": "0.3.5"
}, },
{ {
"hash": "QmQHGMVmrsgmqUG8ih3puNXUJneSpi13dkcZpzLKkskUkH", "hash": "QmQHGMVmrsgmqUG8ih3puNXUJneSpi13dkcZpzLKkskUkH",
...@@ -187,9 +187,9 @@ ...@@ -187,9 +187,9 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmQKfJb2rWd7vCzf38VEx9NQWUkuqfd5KuGo6eZEtQLNcQ", "hash": "QmdEajEhjdowKAsrZiaZBWnZkdREjT32UZAsfhAfcmonr3",
"name": "go-libp2p-conn", "name": "go-libp2p-conn",
"version": "1.5.1" "version": "1.5.4"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -211,15 +211,15 @@ ...@@ -211,15 +211,15 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmPTGbC34bPKaUm9wTxBo7zSCac7pDuG42ZmnXC718CKZZ", "hash": "Qmb55YjV8z4PNChVKZM5rehSFLtgDURmDZ8d3DdkvBBZR1",
"name": "go-libp2p-host", "name": "go-libp2p-host",
"version": "1.3.1" "version": "1.3.4"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmWfxnAiQ5TnnCgiX9ikVUKFNHRgGhbgKdx5DoKPELD7P4", "hash": "QmfSEkiXoCxTkRajpiHiyruDGSGpX9n6NivdFQJWBGVCQM",
"name": "go-libp2p-swarm", "name": "go-libp2p-swarm",
"version": "1.6.1" "version": "1.6.4"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -229,15 +229,15 @@ ...@@ -229,15 +229,15 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmWdGJY4fcsfhLHucEfivw8J71yUqNUFbzdU1jnJBnN5Xh", "hash": "QmUcaGGriHNMtx7udfjUFQ7yDVq8zopYRDey5Lj68z5DZQ",
"name": "go-libp2p-netutil", "name": "go-libp2p-netutil",
"version": "0.2.1" "version": "0.2.4"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmSzhYTPRvh5nUJnRfYBW52QGX6jekULCRQcrxRs8hmzj4", "hash": "QmRg8JoySAwWunyjZwwo8fbm5ysw3vv3ggSHcmw8KXzk6f",
"name": "go-libp2p-blankhost", "name": "go-libp2p-blankhost",
"version": "0.1.1" "version": "0.1.4"
} }
], ],
"gxVersion": "0.4.0", "gxVersion": "0.4.0",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment