Commit c0e2b930 authored by Jeromy's avatar Jeromy
Browse files

update multistream deps and fix code to work with new changes

parent 5b9cd671
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"time"
pstore "github.com/ipfs/go-libp2p-peerstore" pstore "github.com/ipfs/go-libp2p-peerstore"
host "github.com/ipfs/go-libp2p/p2p/host" host "github.com/ipfs/go-libp2p/p2p/host"
......
...@@ -114,7 +114,8 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { ...@@ -114,7 +114,8 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
logStream := mstream.WrapStream(s, protocol.ID(protoID), h.bwc) logStream := mstream.WrapStream(s, protocol.ID(protoID), h.bwc)
go handle(logStream) s.SetProtocol(protoID)
go handle(protoID, logStream)
} }
// ID returns the (local) peer.ID associated with this Host // ID returns the (local) peer.ID associated with this Host
...@@ -147,8 +148,10 @@ func (h *BasicHost) IDService() *identify.IDService { ...@@ -147,8 +148,10 @@ func (h *BasicHost) IDService() *identify.IDService {
// host.Mux().SetHandler(proto, handler) // host.Mux().SetHandler(proto, handler)
// (Threadsafe) // (Threadsafe)
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) { func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) {
h.Mux().AddHandler(string(pid), func(rwc io.ReadWriteCloser) error { h.Mux().AddHandler(string(pid), func(p string, rwc io.ReadWriteCloser) error {
handler(rwc.(inet.Stream)) is := rwc.(inet.Stream)
is.SetProtocol(p)
handler(is)
return nil return nil
}) })
} }
......
...@@ -23,7 +23,7 @@ import ( ...@@ -23,7 +23,7 @@ import (
) )
func goroFilter(r *grc.Goroutine) bool { func goroFilter(r *grc.Goroutine) bool {
return strings.Contains(r.Function, "go-log.") return strings.Contains(r.Function, "go-log.") || strings.Contains(r.Stack[0], "testing.(*T).Run")
} }
func echoListen(ctx context.Context, listener Listener) { func echoListen(ctx context.Context, listener Listener) {
......
...@@ -26,6 +26,9 @@ type Stream interface { ...@@ -26,6 +26,9 @@ type Stream interface {
io.Writer io.Writer
io.Closer io.Closer
Protocol() string
SetProtocol(string)
// Conn returns the connection this stream is part of. // Conn returns the connection this stream is part of.
Conn() Conn Conn() Conn
} }
......
...@@ -16,6 +16,8 @@ type stream struct { ...@@ -16,6 +16,8 @@ type stream struct {
conn *conn conn *conn
toDeliver chan *transportObject toDeliver chan *transportObject
proc process.Process proc process.Process
protocol string
} }
type transportObject struct { type transportObject struct {
...@@ -48,6 +50,14 @@ func (s *stream) Write(p []byte) (n int, err error) { ...@@ -48,6 +50,14 @@ func (s *stream) Write(p []byte) (n int, err error) {
return len(p), nil return len(p), nil
} }
func (s *stream) Protocol() string {
return s.protocol
}
func (s *stream) SetProtocol(proto string) {
s.protocol = proto
}
func (s *stream) Close() error { func (s *stream) Close() error {
return s.proc.Close() return s.proc.Close()
} }
......
...@@ -340,9 +340,9 @@ func (n *ps2netNotifee) Disconnected(c *ps.Conn) { ...@@ -340,9 +340,9 @@ func (n *ps2netNotifee) Disconnected(c *ps.Conn) {
} }
func (n *ps2netNotifee) OpenedStream(s *ps.Stream) { func (n *ps2netNotifee) OpenedStream(s *ps.Stream) {
n.not.OpenedStream(n.net, inet.Stream((*Stream)(s))) n.not.OpenedStream(n.net, &Stream{stream: s})
} }
func (n *ps2netNotifee) ClosedStream(s *ps.Stream) { func (n *ps2netNotifee) ClosedStream(s *ps.Stream) {
n.not.ClosedStream(n.net, inet.Stream((*Stream)(s))) n.not.ClosedStream(n.net, &Stream{stream: s})
} }
...@@ -10,6 +10,12 @@ import ( ...@@ -10,6 +10,12 @@ import (
context "golang.org/x/net/context" context "golang.org/x/net/context"
) )
func streamsSame(a, b inet.Stream) bool {
sa := a.(*Stream)
sb := b.(*Stream)
return sa.Stream() == sb.Stream()
}
func TestNotifications(t *testing.T) { func TestNotifications(t *testing.T) {
ctx := context.Background() ctx := context.Background()
swarms := makeSwarms(ctx, t, 5) swarms := makeSwarms(ctx, t, 5)
...@@ -98,7 +104,7 @@ func TestNotifications(t *testing.T) { ...@@ -98,7 +104,7 @@ func TestNotifications(t *testing.T) {
case <-time.After(timeout): case <-time.After(timeout):
t.Fatal("timeout") t.Fatal("timeout")
} }
if s != s2 { if !streamsSame(s, s2) {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
} }
...@@ -108,7 +114,7 @@ func TestNotifications(t *testing.T) { ...@@ -108,7 +114,7 @@ func TestNotifications(t *testing.T) {
case <-time.After(timeout): case <-time.After(timeout):
t.Fatal("timeout") t.Fatal("timeout")
} }
if s != s2 { if !streamsSame(s, s2) {
t.Fatal("got incorrect stream", s.Conn(), s2.Conn()) t.Fatal("got incorrect stream", s.Conn(), s2.Conn())
} }
} }
......
...@@ -8,11 +8,14 @@ import ( ...@@ -8,11 +8,14 @@ import (
// a Stream is a wrapper around a ps.Stream that exposes a way to get // a Stream is a wrapper around a ps.Stream that exposes a way to get
// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm) // our Conn and Swarm (instead of just the ps.Conn and ps.Swarm)
type Stream ps.Stream type Stream struct {
stream *ps.Stream
protocol string
}
// Stream returns the underlying peerstream.Stream // Stream returns the underlying peerstream.Stream
func (s *Stream) Stream() *ps.Stream { func (s *Stream) Stream() *ps.Stream {
return (*ps.Stream)(s) return s.stream
} }
// Conn returns the Conn associated with this Stream, as an inet.Conn // Conn returns the Conn associated with this Stream, as an inet.Conn
...@@ -22,27 +25,37 @@ func (s *Stream) Conn() inet.Conn { ...@@ -22,27 +25,37 @@ func (s *Stream) Conn() inet.Conn {
// SwarmConn returns the Conn associated with this Stream, as a *Conn // SwarmConn returns the Conn associated with this Stream, as a *Conn
func (s *Stream) SwarmConn() *Conn { func (s *Stream) SwarmConn() *Conn {
return (*Conn)(s.Stream().Conn()) return (*Conn)(s.stream.Conn())
} }
// Read reads bytes from a stream. // Read reads bytes from a stream.
func (s *Stream) Read(p []byte) (n int, err error) { func (s *Stream) Read(p []byte) (n int, err error) {
return s.Stream().Read(p) return s.stream.Read(p)
} }
// Write writes bytes to a stream, flushing for each call. // Write writes bytes to a stream, flushing for each call.
func (s *Stream) Write(p []byte) (n int, err error) { func (s *Stream) Write(p []byte) (n int, err error) {
return s.Stream().Write(p) return s.stream.Write(p)
} }
// Close closes the stream, indicating this side is finished // Close closes the stream, indicating this side is finished
// with the stream. // with the stream.
func (s *Stream) Close() error { func (s *Stream) Close() error {
return s.Stream().Close() return s.stream.Close()
}
func (s *Stream) Protocol() string {
return s.protocol
}
func (s *Stream) SetProtocol(p string) {
s.protocol = p
} }
func wrapStream(pss *ps.Stream) *Stream { func wrapStream(pss *ps.Stream) *Stream {
return (*Stream)(pss) return &Stream{
stream: pss,
}
} }
func wrapStreams(st []*ps.Stream) []*Stream { func wrapStreams(st []*ps.Stream) []*Stream {
......
...@@ -34,9 +34,9 @@ ...@@ -34,9 +34,9 @@
"version": "1.0.0" "version": "1.0.0"
}, },
{ {
"hash": "Qmf91yhgRLo2dhhbc5zZ7TxjMaR1oxaWaoc9zRZdi1kU4a", "hash": "Qmc8WfU6Ci9e1qvTNYE3EUwrHEXfpxY7dNrWtVtjpYcp2P",
"name": "go-multistream", "name": "go-multistream",
"version": "0.0.0" "version": "0.1.0"
}, },
{ {
"hash": "QmNLvkCDV6ZjUJsEwGNporYBuZdhWT6q7TBVYQwwRv12HT", "hash": "QmNLvkCDV6ZjUJsEwGNporYBuZdhWT6q7TBVYQwwRv12HT",
...@@ -171,9 +171,9 @@ ...@@ -171,9 +171,9 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmVcmcQE9eX4HQ8QwhVXpoHt3ennG7d299NDYFq9D1Uqa1", "hash": "QmRbnoT3xJXpi37Vc11e6VYV4RXKWUMsZtfbBQkR43377P",
"name": "go-smux-multistream", "name": "go-smux-multistream",
"version": "1.0.0" "version": "1.1.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment