diff --git a/.travis.yml b/.travis.yml index 072d4c8ddf68574354610ee5d26ec603051eea76..1b2ce8d8cf9329c6dd7f42c13d1640e3ae255aba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ os: language: go go: - - 1.5.2 + - 1.7 env: - GO15VENDOREXPERIMENT=1 diff --git a/examples/hosts/main.go b/examples/hosts/main.go index 256fdbff8f8d6336caffe3d074aee16e90725977..2fe0b8fe5bba3cb73ef0cbf6a523da5a1abd5ac7 100644 --- a/examples/hosts/main.go +++ b/examples/hosts/main.go @@ -97,7 +97,7 @@ func main() { log.Println("opening stream...") // make a new stream from host B to host A // it should be handled on host A by the handler we set - s, err := ha.NewStream(context.Background(), "/hello/1.0.0", a.ID()) + s, err := ha.NewStream(context.Background(), a.ID(), "/hello/1.0.0") if err != nil { log.Fatalln(err) } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 68b3e08e013d96e69e082caaf4d2300c77548774..83c0f7050bbeaeda3d882ac0a8dc65606867a86a 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -111,10 +111,10 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { } return } + s.SetProtocol(protocol.ID(protoID)) - logStream := mstream.WrapStream(s, protocol.ID(protoID), h.bwc) + logStream := mstream.WrapStream(s, h.bwc) - s.SetProtocol(protoID) go handle(protoID, logStream) } @@ -150,7 +150,7 @@ func (h *BasicHost) IDService() *identify.IDService { func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler) { h.Mux().AddHandler(string(pid), func(p string, rwc io.ReadWriteCloser) error { is := rwc.(inet.Stream) - is.SetProtocol(p) + is.SetProtocol(protocol.ID(p)) handler(is) return nil }) @@ -161,7 +161,7 @@ func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool, handler inet.StreamHandler) { h.Mux().AddHandlerWithFunc(string(pid), m, func(p string, rwc io.ReadWriteCloser) error { is := rwc.(inet.Stream) - is.SetProtocol(p) + is.SetProtocol(protocol.ID(p)) handler(is) return nil }) @@ -176,13 +176,69 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) { // header with given protocol.ID. If there is no connection to p, attempts // to create one. If ProtocolID is "", writes no header. // (Threadsafe) -func (h *BasicHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) { +func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (inet.Stream, error) { + pref, err := h.preferredProtocol(p, pids) + if err != nil { + return nil, err + } + + if pref != "" { + return h.newStream(ctx, p, pref) + } + + var protoStrs []string + for _, pid := range pids { + protoStrs = append(protoStrs, string(pid)) + } + s, err := h.Network().NewStream(ctx, p) if err != nil { return nil, err } - logStream := mstream.WrapStream(s, pid, h.bwc) + selected, err := msmux.SelectOneOf(protoStrs, s) + if err != nil { + s.Close() + return nil, err + } + selpid := protocol.ID(selected) + s.SetProtocol(selpid) + h.Peerstore().AddProtocols(p, selected) + + return mstream.WrapStream(s, h.bwc), nil +} + +func pidsToStrings(pids []protocol.ID) []string { + out := make([]string, len(pids)) + for i, p := range pids { + out[i] = string(p) + } + return out +} + +func (h *BasicHost) preferredProtocol(p peer.ID, pids []protocol.ID) (protocol.ID, error) { + pidstrs := pidsToStrings(pids) + supported, err := h.Peerstore().SupportsProtocols(p, pidstrs...) + if err != nil { + return "", err + } + + var out protocol.ID + if len(supported) > 0 { + out = protocol.ID(supported[0]) + } + return out, nil +} + +func (h *BasicHost) newStream(ctx context.Context, p peer.ID, pid protocol.ID) (inet.Stream, error) { + s, err := h.Network().NewStream(ctx, p) + if err != nil { + return nil, err + } + + s.SetProtocol(pid) + + logStream := mstream.WrapStream(s, h.bwc) lzcon := msmux.NewMSSelect(logStream, string(pid)) return &streamWrapper{ diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index f1f2e248a3fa4629e7369461c3777f94614d39d6..8ff677dbfa32076f8b408f074e7bf6145fca7cea 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -4,7 +4,9 @@ import ( "bytes" "io" "testing" + "time" + host "github.com/libp2p/go-libp2p/p2p/host" inet "github.com/libp2p/go-libp2p/p2p/net" protocol "github.com/libp2p/go-libp2p/p2p/protocol" testutil "github.com/libp2p/go-libp2p/p2p/test/util" @@ -32,7 +34,7 @@ func TestHostSimple(t *testing.T) { io.Copy(w, s) // mirror everything }) - s, err := h1.NewStream(ctx, protocol.TestingID, h2pi.ID) + s, err := h1.NewStream(ctx, h2pi.ID, protocol.TestingID) if err != nil { t.Fatal(err) } @@ -61,3 +63,182 @@ func TestHostSimple(t *testing.T) { t.Fatal("buf1 != buf3 -- %x != %x", buf1, buf3) } } + +func getHostPair(ctx context.Context, t *testing.T) (host.Host, host.Host) { + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + + h2pi := h2.Peerstore().PeerInfo(h2.ID()) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatal(err) + } + + return h1, h2 +} + +func assertWait(t *testing.T, c chan protocol.ID, exp protocol.ID) { + select { + case proto := <-c: + if proto != exp { + t.Fatal("should have connected on ", exp) + } + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for stream") + } +} + +func TestHostProtoPreference(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1, h2 := getHostPair(ctx, t) + defer h1.Close() + defer h2.Close() + + protoOld := protocol.ID("/testing") + protoNew := protocol.ID("/testing/1.1.0") + protoMinor := protocol.ID("/testing/1.2.0") + + connectedOn := make(chan protocol.ID, 16) + + handler := func(s inet.Stream) { + connectedOn <- s.Protocol() + s.Close() + } + + h1.SetStreamHandler(protoOld, handler) + + s, err := h2.NewStream(ctx, h1.ID(), protoMinor, protoNew, protoOld) + if err != nil { + t.Fatal(err) + } + + assertWait(t, connectedOn, protoOld) + s.Close() + + mfunc, err := host.MultistreamSemverMatcher(protoMinor) + if err != nil { + t.Fatal(err) + } + + h1.SetStreamHandlerMatch(protoMinor, mfunc, handler) + + // remembered preference will be chosen first, even when the other side newly supports it + s2, err := h2.NewStream(ctx, h1.ID(), protoMinor, protoNew, protoOld) + if err != nil { + t.Fatal(err) + } + + // required to force 'lazy' handshake + _, err = s2.Write([]byte("hello")) + if err != nil { + t.Fatal(err) + } + + assertWait(t, connectedOn, protoOld) + + s2.Close() + + s3, err := h2.NewStream(ctx, h1.ID(), protoMinor) + if err != nil { + t.Fatal(err) + } + + assertWait(t, connectedOn, protoMinor) + s3.Close() +} + +func TestHostProtoMismatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1, h2 := getHostPair(ctx, t) + defer h1.Close() + defer h2.Close() + + h1.SetStreamHandler("/super", func(s inet.Stream) { + t.Error("shouldnt get here") + s.Close() + }) + + _, err := h2.NewStream(ctx, h1.ID(), "/foo", "/bar", "/baz/1.0.0") + if err == nil { + t.Fatal("expected new stream to fail") + } +} + +func TestHostProtoPreknowledge(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1 := testutil.GenHostSwarm(t, ctx) + h2 := testutil.GenHostSwarm(t, ctx) + + conn := make(chan protocol.ID, 16) + handler := func(s inet.Stream) { + conn <- s.Protocol() + s.Close() + } + + h1.SetStreamHandler("/super", handler) + + h2pi := h2.Peerstore().PeerInfo(h2.ID()) + if err := h1.Connect(ctx, h2pi); err != nil { + t.Fatal(err) + } + defer h1.Close() + defer h2.Close() + + // wait for identify handshake to finish completely + time.Sleep(time.Millisecond * 20) + + h1.SetStreamHandler("/foo", handler) + + s, err := h2.NewStream(ctx, h1.ID(), "/foo", "/bar", "/super") + if err != nil { + t.Fatal(err) + } + + select { + case p := <-conn: + t.Fatal("shouldnt have gotten connection yet, we should have a lazy stream: ", p) + case <-time.After(time.Millisecond * 50): + } + + _, err = s.Read(nil) + if err != nil { + t.Fatal(err) + } + + assertWait(t, conn, "/super") + + s.Close() +} + +func TestNewDialOld(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1, h2 := getHostPair(ctx, t) + defer h1.Close() + defer h2.Close() + + connectedOn := make(chan protocol.ID, 16) + h1.SetStreamHandler("/testing", func(s inet.Stream) { + connectedOn <- s.Protocol() + s.Close() + }) + + s, err := h2.NewStream(ctx, h1.ID(), "/testing/1.0.0", "/testing") + if err != nil { + t.Fatal(err) + } + + assertWait(t, connectedOn, "/testing") + + if s.Protocol() != "/testing" { + t.Fatal("shoould have gotten /testing") + } + + s.Close() +} diff --git a/p2p/host/host.go b/p2p/host/host.go index 65810e0334a86557e5b361a0de02433e8ead0f3b..c4da4f51b376047dec4ea00fcb38d123185bc69e 100644 --- a/p2p/host/host.go +++ b/p2p/host/host.go @@ -60,7 +60,7 @@ type Host interface { // header with given protocol.ID. If there is no connection to p, attempts // to create one. If ProtocolID is "", writes no header. // (Threadsafe) - NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) + NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (inet.Stream, error) // Close shuts down the host, its Network, and services. Close() error diff --git a/p2p/host/match.go b/p2p/host/match.go index dfee37e26b192342ef66061e7f1e7abd43507aef..571d652c23a6065072b1f496c816d98056b97285 100644 --- a/p2p/host/match.go +++ b/p2p/host/match.go @@ -1,13 +1,14 @@ package host import ( + "github.com/libp2p/go-libp2p/p2p/protocol" "strings" semver "github.com/coreos/go-semver/semver" ) -func MultistreamSemverMatcher(base string) (func(string) bool, error) { - parts := strings.Split(base, "/") +func MultistreamSemverMatcher(base protocol.ID) (func(string) bool, error) { + parts := strings.Split(string(base), "/") vers, err := semver.NewVersion(parts[len(parts)-1]) if err != nil { return nil, err diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index 4d25afdd4d69d21cb65d4010cd54cdc4ebbf7b0a..928297df7f8399b5ab842274c9e676182723179f 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -118,8 +118,8 @@ func (rh *RoutedHost) RemoveStreamHandler(pid protocol.ID) { rh.host.RemoveStreamHandler(pid) } -func (rh *RoutedHost) NewStream(ctx context.Context, pid protocol.ID, p peer.ID) (inet.Stream, error) { - return rh.host.NewStream(ctx, pid, p) +func (rh *RoutedHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (inet.Stream, error) { + return rh.host.NewStream(ctx, p, pids...) } func (rh *RoutedHost) Close() error { // no need to close IpfsRouting. we dont own it. diff --git a/p2p/metrics/stream/metered.go b/p2p/metrics/stream/metered.go index 2c7a4c6b947c06ad8c0d99f042d45b35ce727718..14de24c575de682e62686a11b6ea9b46b123241c 100644 --- a/p2p/metrics/stream/metered.go +++ b/p2p/metrics/stream/metered.go @@ -19,18 +19,18 @@ type meteredStream struct { mesRecv metrics.StreamMeterCallback } -func newMeteredStream(base inet.Stream, pid protocol.ID, p peer.ID, recvCB, sentCB metrics.StreamMeterCallback) inet.Stream { +func newMeteredStream(base inet.Stream, p peer.ID, recvCB, sentCB metrics.StreamMeterCallback) inet.Stream { return &meteredStream{ Stream: base, mesSent: sentCB, mesRecv: recvCB, - protoKey: pid, + protoKey: base.Protocol(), peerKey: p, } } -func WrapStream(base inet.Stream, pid protocol.ID, bwc metrics.Reporter) inet.Stream { - return newMeteredStream(base, pid, base.Conn().RemotePeer(), bwc.LogRecvMessageStream, bwc.LogSentMessageStream) +func WrapStream(base inet.Stream, bwc metrics.Reporter) inet.Stream { + return newMeteredStream(base, base.Conn().RemotePeer(), bwc.LogRecvMessageStream, bwc.LogSentMessageStream) } func (s *meteredStream) Read(b []byte) (int, error) { diff --git a/p2p/metrics/stream/metered_test.go b/p2p/metrics/stream/metered_test.go index 5af586ee20999558434bb4cf7846b70ee638457c..22b3f9370149d0eba22d7211f793b2c9ada71299 100644 --- a/p2p/metrics/stream/metered_test.go +++ b/p2p/metrics/stream/metered_test.go @@ -24,6 +24,10 @@ func (fs *FakeStream) Write(b []byte) (int, error) { return len(b), nil } +func (fs *FakeStream) Protocol() protocol.ID { + return "TEST" +} + func TestCallbacksWork(t *testing.T) { fake := new(FakeStream) @@ -38,7 +42,7 @@ func TestCallbacksWork(t *testing.T) { recv += n } - ms := newMeteredStream(fake, protocol.ID("TEST"), peer.ID("PEER"), recvCB, sentCB) + ms := newMeteredStream(fake, peer.ID("PEER"), recvCB, sentCB) toWrite := int64(100000) toRead := int64(100000) diff --git a/p2p/net/interface.go b/p2p/net/interface.go index 12826ed8ba79651c0df680d629df75dc337a09a1..8ae6078499e0a568f6f4aa39b9d873667608e0b4 100644 --- a/p2p/net/interface.go +++ b/p2p/net/interface.go @@ -8,6 +8,7 @@ import ( ma "github.com/jbenet/go-multiaddr" "github.com/jbenet/goprocess" conn "github.com/libp2p/go-libp2p/p2p/net/conn" + protocol "github.com/libp2p/go-libp2p/p2p/protocol" context "golang.org/x/net/context" ) @@ -26,8 +27,8 @@ type Stream interface { io.Writer io.Closer - Protocol() string - SetProtocol(string) + Protocol() protocol.ID + SetProtocol(protocol.ID) // Conn returns the connection this stream is part of. Conn() Conn diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index 2de6df2f3c62a571cf3cf8853ac89cfad94833ff..dac95688f4a2c3044df93eeba16323513a0da1b4 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -7,6 +7,7 @@ import ( process "github.com/jbenet/goprocess" inet "github.com/libp2p/go-libp2p/p2p/net" + protocol "github.com/libp2p/go-libp2p/p2p/protocol" ) // stream implements inet.Stream @@ -17,7 +18,7 @@ type stream struct { toDeliver chan *transportObject proc process.Process - protocol string + protocol protocol.ID } type transportObject struct { @@ -50,11 +51,11 @@ func (s *stream) Write(p []byte) (n int, err error) { return len(p), nil } -func (s *stream) Protocol() string { +func (s *stream) Protocol() protocol.ID { return s.protocol } -func (s *stream) SetProtocol(proto string) { +func (s *stream) SetProtocol(proto protocol.ID) { s.protocol = proto } diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index 9c33b3bb6a1c3045e7321bb3e4c57468866cb14d..973104e255f2d8f83ac2052c64c7df55fa1cc3b6 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -298,7 +298,7 @@ func TestStreams(t *testing.T) { h.SetStreamHandler(protocol.TestingID, handler) } - s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID()) + s, err := hosts[0].NewStream(ctx, hosts[1].ID(), protocol.TestingID) if err != nil { t.Fatal(err) } @@ -386,7 +386,7 @@ func TestStreamsStress(t *testing.T) { defer wg.Done() from := rand.Intn(len(hosts)) to := rand.Intn(len(hosts)) - s, err := hosts[from].NewStream(ctx, protocol.TestingID, hosts[to].ID()) + s, err := hosts[from].NewStream(ctx, hosts[to].ID(), protocol.TestingID) if err != nil { log.Debugf("%d (%s) %d (%s)", from, hosts[from], to, hosts[to]) panic(err) @@ -466,7 +466,7 @@ func TestAdding(t *testing.T) { } ctx := context.Background() - s, err := h1.NewStream(ctx, protocol.TestingID, p2) + s, err := h1.NewStream(ctx, p2, protocol.TestingID) if err != nil { t.Fatal(err) } @@ -563,7 +563,7 @@ func TestLimitedStreams(t *testing.T) { } ctx := context.Background() - s, err := hosts[0].NewStream(ctx, protocol.TestingID, hosts[1].ID()) + s, err := hosts[0].NewStream(ctx, hosts[1].ID(), protocol.TestingID) if err != nil { t.Fatal(err) } diff --git a/p2p/net/swarm/swarm_net_test.go b/p2p/net/swarm/swarm_net_test.go index 1294e899ba46ddc433758202bef8fd46c009d880..4d564e3901fc2ee086e8d5fa8f71a856ba5ad99a 100644 --- a/p2p/net/swarm/swarm_net_test.go +++ b/p2p/net/swarm/swarm_net_test.go @@ -68,6 +68,8 @@ func TestConnectednessCorrect(t *testing.T) { t.Fatal(err) } + time.Sleep(time.Millisecond * 50) + expectConnectedness(t, nets[2], nets[1], inet.NotConnected) for _, n := range nets { diff --git a/p2p/net/swarm/swarm_stream.go b/p2p/net/swarm/swarm_stream.go index 86719819a512b57191974b1ddd39cf1f75ed8683..dc365e8c092c79d439daf532980412d88e07008d 100644 --- a/p2p/net/swarm/swarm_stream.go +++ b/p2p/net/swarm/swarm_stream.go @@ -2,6 +2,7 @@ package swarm import ( inet "github.com/libp2p/go-libp2p/p2p/net" + protocol "github.com/libp2p/go-libp2p/p2p/protocol" ps "github.com/jbenet/go-peerstream" ) @@ -10,7 +11,7 @@ import ( // our Conn and Swarm (instead of just the ps.Conn and ps.Swarm) type Stream struct { stream *ps.Stream - protocol string + protocol protocol.ID } // Stream returns the underlying peerstream.Stream @@ -44,11 +45,11 @@ func (s *Stream) Close() error { return s.stream.Close() } -func (s *Stream) Protocol() string { +func (s *Stream) Protocol() protocol.ID { return s.protocol } -func (s *Stream) SetProtocol(p string) { +func (s *Stream) SetProtocol(p protocol.ID) { s.protocol = p } diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index cba8b8eccdc21df54b65ba19e3bf312df48ef8aa..39f49f6146263b7cc6e4ca6b6a2526d98dcde894 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -86,8 +86,10 @@ func (ids *IDService) IdentifyConn(c inet.Conn) { return } + s.SetProtocol(ID) + bwc := ids.Host.GetBandwidthReporter() - s = mstream.WrapStream(s, ID, bwc) + s = mstream.WrapStream(s, bwc) // ok give the response to our handler. if err := msmux.SelectProtoOrFail(ID, s); err != nil { @@ -115,7 +117,7 @@ func (ids *IDService) RequestHandler(s inet.Stream) { c := s.Conn() bwc := ids.Host.GetBandwidthReporter() - s = mstream.WrapStream(s, ID, bwc) + s = mstream.WrapStream(s, bwc) w := ggio.NewDelimitedWriter(s) mes := pb.Identify{} @@ -173,7 +175,7 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) { p := c.RemotePeer() // mes.Protocols - ids.Host.Peerstore().SetProtocols(p, mes.Protocols) + ids.Host.Peerstore().AddProtocols(p, mes.Protocols...) // mes.ObservedAddr ids.consumeObservedAddress(mes.GetObservedAddr(), c) diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index e16ff6686e7de78c5403d2873fec359673b9b6e5..e4dc40a967905342f48882af609907cb6d9527b7 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -68,7 +68,7 @@ func (p *PingService) PingHandler(s inet.Stream) { } func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) { - s, err := ps.Host.NewStream(ctx, ID, p) + s, err := ps.Host.NewStream(ctx, p, ID) if err != nil { return nil, err } diff --git a/p2p/protocol/relay/relay.go b/p2p/protocol/relay/relay.go index 6fa3c52a552b3b1cd734c3de46c9d8269c1a3d75..885018dee6fefb1a213f95ed855ee8a165a0d40f 100644 --- a/p2p/protocol/relay/relay.go +++ b/p2p/protocol/relay/relay.go @@ -123,7 +123,7 @@ func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error { // for now, can only open streams to directly connected peers. // maybe we can do some routing later on. func (rs *RelayService) openStreamToPeer(ctx context.Context, p peer.ID) (inet.Stream, error) { - return rs.host.NewStream(ctx, ID, p) + return rs.host.NewStream(ctx, p, ID) } func ReadHeader(r io.Reader) (src, dst peer.ID, err error) { diff --git a/p2p/protocol/relay/relay_test.go b/p2p/protocol/relay/relay_test.go index 858b525776e6c671e0725cf413e08d64edd647a8..e93491f793bf82ea94a9773bb02e54d676a98021 100644 --- a/p2p/protocol/relay/relay_test.go +++ b/p2p/protocol/relay/relay_test.go @@ -49,7 +49,7 @@ func TestRelaySimple(t *testing.T) { // ok, now we can try to relay n1--->n2--->n3. log.Debug("open relay stream") - s, err := n1.NewStream(ctx, relay.ID, n2p) + s, err := n1.NewStream(ctx, n2p, relay.ID) if err != nil { t.Fatal(err) } @@ -144,7 +144,7 @@ func TestRelayAcrossFour(t *testing.T) { // ok, now we can try to relay n1--->n2--->n3--->n4--->n5 log.Debug("open relay stream") - s, err := n1.NewStream(ctx, relay.ID, n2p) + s, err := n1.NewStream(ctx, n2p, relay.ID) if err != nil { t.Fatal(err) } @@ -244,7 +244,7 @@ func TestRelayStress(t *testing.T) { // ok, now we can try to relay n1--->n2--->n3. log.Debug("open relay stream") - s, err := n1.NewStream(ctx, relay.ID, n2p) + s, err := n1.NewStream(ctx, n2p, relay.ID) if err != nil { t.Fatal(err) } diff --git a/p2p/test/backpressure/backpressure_test.go b/p2p/test/backpressure/backpressure_test.go index 859692753c2696920d3fa3ed3e5fa05708d0fddc..aedc0e3b82928dfae9f33c667b82be68d023e1e8 100644 --- a/p2p/test/backpressure/backpressure_test.go +++ b/p2p/test/backpressure/backpressure_test.go @@ -83,7 +83,7 @@ a problem. }() for { - s, err = host.NewStream(context.Background(), protocol.TestingID, remote) + s, err = host.NewStream(context.Background(), remote, protocol.TestingID) if err != nil { return } @@ -285,7 +285,7 @@ func TestStBackpressureStreamWrite(t *testing.T) { } // open a stream, from 2->1, this is our reader - s, err := h2.NewStream(context.Background(), protocol.TestingID, h1.ID()) + s, err := h2.NewStream(context.Background(), h1.ID(), protocol.TestingID) if err != nil { t.Fatal(err) } diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go index 49125a39740cf5d16669ed4f0470f662fbd6d243..aea77cd861b099e3f99f8d101b01f757dff35cce 100644 --- a/p2p/test/reconnects/reconnect_test.go +++ b/p2p/test/reconnects/reconnect_test.go @@ -177,7 +177,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { for i := 0; i < numStreams; i++ { h1 := hosts[i%len(hosts)] h2 := hosts[(i+1)%len(hosts)] - s, err := h1.NewStream(context.Background(), protocol.TestingID, h2.ID()) + s, err := h1.NewStream(context.Background(), h2.ID(), protocol.TestingID) if err != nil { t.Error(err) } diff --git a/package.json b/package.json index 934dc798babc2a727650ee478c146bc875455216..623b72a2284d9bd78c449c83f6096c8a786422f6 100644 --- a/package.json +++ b/package.json @@ -34,9 +34,9 @@ "version": "1.0.0" }, { - "hash": "QmZiG1UAEz6n5XpMGcKavQjcZxKcu8FKgboC7WCDZJM9RW", + "hash": "QmatJnBK2qyjcy1AYq4Gb5YH16YM7uibdteQ589r46YLvB", "name": "go-multistream", - "version": "0.3.0" + "version": "0.3.1" }, { "hash": "QmNLvkCDV6ZjUJsEwGNporYBuZdhWT6q7TBVYQwwRv12HT", @@ -54,9 +54,9 @@ "version": "0.0.0" }, { - "hash": "QmNQynaz7qfriSUJkiEZUrm2Wen1u3Kj9goZzWtrPyu7XR", + "hash": "QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52", "name": "go-log", - "version": "1.1.2" + "version": "1.2.0" }, { "hash": "QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7", @@ -147,21 +147,21 @@ }, { "author": "whyrusleeping", - "hash": "QmNaS34WZRjs4U1kDfRR2aooYSsBSusFPMEg3dAKk7VZid", + "hash": "QmSVddpXhD2QziagxjdRs7eUeNypGugGxWqDsFeauv33XR", "name": "go-libp2p-loggables", - "version": "1.0.7" + "version": "1.0.9" }, { "author": "whyrusleeping", - "hash": "QmVjz1uf6U3sVQ5DbWWj7ktTtDd4GgsptYc7FBp33nWE53", + "hash": "QmbP93111oShRbdjoWvP3NZCUApTLkaPXgaNDaXZPfHQHR", "name": "go-libp2p-secio", - "version": "1.0.8" + "version": "1.0.11" }, { "author": "whyrusleeping", - "hash": "QmWzfrG1PUeF8mDpYfNsRL3wh5Rkgnp68LAWUB2bhuDWRL", + "hash": "QmbRuJ16EfPGWuEguoNxwPacpeHFUAZb2XQXVvUt5Vxg5q", "name": "go-libp2p-transport", - "version": "1.3.2" + "version": "1.3.5" }, { "author": "whyrusleeping", @@ -171,9 +171,9 @@ }, { "author": "whyrusleeping", - "hash": "QmRz4k7KxxdBiKByEVw7caET27b3bQvnGLFDnsdSJV3ruH", + "hash": "Qme8hbiTP4VNr1s7FxsfnnqrxbxPz3KPWtuGYeGbtFnhGC", "name": "go-smux-multistream", - "version": "1.3.0" + "version": "1.3.1" }, { "author": "whyrusleeping", @@ -183,9 +183,9 @@ }, { "author": "whyrusleeping", - "hash": "QmQdnfvZQuhdT93LNc5bos52wAmdr3G2p6G8teLJMEN32P", + "hash": "QmSZi9ygLohBUGyHMqE5N6eToPwqcg7bZQTULeVLFu7Q6d", "name": "go-libp2p-peerstore", - "version": "1.1.2" + "version": "1.2.2" } ], "gxVersion": "0.4.0",