Commit 955ae93a authored by Jeromy's avatar Jeromy
Browse files

switch to new version of go-stream-muxer

parent be1f5580
...@@ -4,6 +4,7 @@ package swarm ...@@ -4,6 +4,7 @@ package swarm
import ( import (
"fmt" "fmt"
"io/ioutil"
"sync" "sync"
"time" "time"
...@@ -16,11 +17,13 @@ import ( ...@@ -16,11 +17,13 @@ import (
transport "github.com/ipfs/go-libp2p/p2p/net/transport" transport "github.com/ipfs/go-libp2p/p2p/net/transport"
peer "github.com/ipfs/go-libp2p/p2p/peer" peer "github.com/ipfs/go-libp2p/p2p/peer"
ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream"
"gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
pst "gx/ipfs/QmTYr6RrJs8b63LTVwahmtytnuqzsLfNPBJp6EvmFWHbGh/go-stream-muxer" pst "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer"
psmss "gx/ipfs/QmTYr6RrJs8b63LTVwahmtytnuqzsLfNPBJp6EvmFWHbGh/go-stream-muxer/multistream" psmss "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/multistream"
spdy "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/spdystream"
yamux "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/yamux"
ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
mafilter "gx/ipfs/QmcR6dLYF8Eozaae3wGd5wjq76bofzmmbvQmtwobxvfhEt/multiaddr-filter" mafilter "gx/ipfs/QmcR6dLYF8Eozaae3wGd5wjq76bofzmmbvQmtwobxvfhEt/multiaddr-filter"
...@@ -32,7 +35,21 @@ var log = logging.Logger("swarm2") ...@@ -32,7 +35,21 @@ var log = logging.Logger("swarm2")
var PSTransport pst.Transport var PSTransport pst.Transport
func init() { func init() {
PSTransport = psmss.NewTransport() msstpt := psmss.NewBlankTransport()
ymxtpt := &yamux.Transport{
AcceptBacklog: 2048,
ConnectionWriteTimeout: time.Second * 10,
KeepAliveInterval: time.Second * 30,
EnableKeepAlive: true,
MaxStreamWindowSize: uint32(1024 * 256),
LogOutput: ioutil.Discard,
}
msstpt.AddTransport("/yamux", ymxtpt)
msstpt.AddTransport("/spdystream", spdy.Transport)
PSTransport = msstpt
} }
// Swarm is a connection muxer, allowing connections to other peers to // Swarm is a connection muxer, allowing connections to other peers to
......
...@@ -8,7 +8,7 @@ import ( ...@@ -8,7 +8,7 @@ import (
conn "github.com/ipfs/go-libp2p/p2p/net/conn" conn "github.com/ipfs/go-libp2p/p2p/net/conn"
peer "github.com/ipfs/go-libp2p/p2p/peer" peer "github.com/ipfs/go-libp2p/p2p/peer"
ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
) )
......
...@@ -9,10 +9,10 @@ import ( ...@@ -9,10 +9,10 @@ import (
conn "github.com/ipfs/go-libp2p/p2p/net/conn" conn "github.com/ipfs/go-libp2p/p2p/net/conn"
transport "github.com/ipfs/go-libp2p/p2p/net/transport" transport "github.com/ipfs/go-libp2p/p2p/net/transport"
ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
) // Open listeners and reuse-dialers for the given addresses ) // Open listeners and reuse-dialers for the given addresses
func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error { func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error {
errs := make([]error, len(addrs)) errs := make([]error, len(addrs))
...@@ -152,7 +152,7 @@ func (s *Swarm) connHandler(c *ps.Conn) *Conn { ...@@ -152,7 +152,7 @@ func (s *Swarm) connHandler(c *ps.Conn) *Conn {
if err != nil { if err != nil {
log.Debug(err) log.Debug(err)
log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err)) log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
c.Close() // boom. close it. c.Close() // boom. close it.
return nil return nil
} }
......
...@@ -3,7 +3,7 @@ package swarm ...@@ -3,7 +3,7 @@ package swarm
import ( import (
inet "github.com/ipfs/go-libp2p/p2p/net" inet "github.com/ipfs/go-libp2p/p2p/net"
ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream"
) )
// a Stream is a wrapper around a ps.Stream that exposes a way to get // a Stream is a wrapper around a ps.Stream that exposes a way to get
......
...@@ -24,7 +24,7 @@ func EchoStreamHandler(stream inet.Stream) { ...@@ -24,7 +24,7 @@ func EchoStreamHandler(stream inet.Stream) {
// pull out the ipfs conn // pull out the ipfs conn
c := stream.Conn() c := stream.Conn()
log.Errorf("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
buf := make([]byte, 4) buf := make([]byte, 4)
......
...@@ -12,7 +12,7 @@ import ( ...@@ -12,7 +12,7 @@ import (
swarm "github.com/ipfs/go-libp2p/p2p/net/swarm" swarm "github.com/ipfs/go-libp2p/p2p/net/swarm"
protocol "github.com/ipfs/go-libp2p/p2p/protocol" protocol "github.com/ipfs/go-libp2p/p2p/protocol"
testutil "github.com/ipfs/go-libp2p/p2p/test/util" testutil "github.com/ipfs/go-libp2p/p2p/test/util"
ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream"
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
...@@ -35,20 +35,20 @@ func EchoStreamHandler(stream inet.Stream) { ...@@ -35,20 +35,20 @@ func EchoStreamHandler(stream inet.Stream) {
} }
type sendChans struct { type sendChans struct {
send chan struct{} send chan struct{}
sent chan struct{} sent chan struct{}
read chan struct{} read chan struct{}
close_ chan struct{} close_ chan struct{}
closed chan struct{} closed chan struct{}
} }
func newSendChans() sendChans { func newSendChans() sendChans {
return sendChans{ return sendChans{
send: make(chan struct{}), send: make(chan struct{}),
sent: make(chan struct{}), sent: make(chan struct{}),
read: make(chan struct{}), read: make(chan struct{}),
close_: make(chan struct{}), close_: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
} }
...@@ -188,7 +188,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { ...@@ -188,7 +188,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) {
go sF(s) go sF(s)
log.Debugf("getting handle %d", j) log.Debugf("getting handle %d", j)
sc := <-ss // wait to get handle. sc := <-ss // wait to get handle.
log.Debugf("spawning worker %d", j) log.Debugf("spawning worker %d", j)
for k := 0; k < numMsgs; k++ { for k := 0; k < numMsgs; k++ {
...@@ -215,7 +215,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { ...@@ -215,7 +215,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) {
for _, c := range cs { for _, c := range cs {
sc := c.(*swarm.Conn) sc := c.(*swarm.Conn)
if sc.LocalPeer() > sc.RemotePeer() { if sc.LocalPeer() > sc.RemotePeer() {
continue // only close it on one side. continue // only close it on one side.
} }
log.Debugf("closing: %s", sc.RawConn()) log.Debugf("closing: %s", sc.RawConn())
......
...@@ -105,7 +105,7 @@ ...@@ -105,7 +105,7 @@
}, },
{ {
"name": "go-stream-muxer", "name": "go-stream-muxer",
"hash": "QmTYr6RrJs8b63LTVwahmtytnuqzsLfNPBJp6EvmFWHbGh", "hash": "QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn",
"version": "0.0.0" "version": "0.0.0"
}, },
{ {
...@@ -125,7 +125,7 @@ ...@@ -125,7 +125,7 @@
}, },
{ {
"name": "go-peerstream", "name": "go-peerstream",
"hash": "QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK", "hash": "QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8",
"version": "0.0.0" "version": "0.0.0"
}, },
{ {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment