Commit b54517ee authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

net2: separate protocols/services out.

using a placeholder net2 package so tests continue to pass.
Will be swapped atomically into main code.
parent 2c6c64b2
package swarm
import (
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/p2p/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func TestSimultOpen(t *testing.T) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peers := makeSwarms(ctx, t, 2)
// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
wg.Add(2)
go connect(swarms[0], swarms[1].local, peers[1].Addr)
go connect(swarms[1], swarms[0].local, peers[0].Addr)
wg.Wait()
}
for _, s := range swarms {
s.Close()
}
}
func TestSimultOpenMany(t *testing.T) {
// t.Skip("very very slow")
addrs := 20
SubtestSwarm(t, addrs, 10)
}
func TestSimultOpenFewStress(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// t.Skip("skipping for another test")
msgs := 40
swarms := 2
rounds := 10
// rounds := 100
for i := 0; i < rounds; i++ {
SubtestSwarm(t, swarms, msgs)
<-time.After(10 * time.Millisecond)
}
}
// package swarm implements a connection muxer with a pair of channels
// to synchronize all network communication.
package swarm
import (
inet "github.com/jbenet/go-ipfs/p2p/net2"
peer "github.com/jbenet/go-ipfs/p2p/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
)
var log = eventlog.Logger("swarm2")
// Swarm is a connection muxer, allowing connections to other peers to
// be opened and closed, while still using the same Chan for all
// communication. The Chan sends/receives Messages, which note the
// destination or source Peer.
//
// Uses peerstream.Swarm
type Swarm struct {
swarm *ps.Swarm
local peer.ID
peers peer.Peerstore
connh ConnHandler
cg ctxgroup.ContextGroup
}
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local peer.ID, peers peer.Peerstore) (*Swarm, error) {
s := &Swarm{
swarm: ps.NewSwarm(),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
}
// configure Swarm
s.cg.SetTeardown(s.teardown)
s.SetConnHandler(nil) // make sure to setup our own conn handler.
return s, s.listen(listenAddrs)
}
func (s *Swarm) teardown() error {
return s.swarm.Close()
}
// CtxGroup returns the Context Group of the swarm
func (s *Swarm) CtxGroup() ctxgroup.ContextGroup {
return s.cg
}
// Close stops the Swarm.
func (s *Swarm) Close() error {
return s.cg.Close()
}
// StreamSwarm returns the underlying peerstream.Swarm
func (s *Swarm) StreamSwarm() *ps.Swarm {
return s.swarm
}
// SetConnHandler assigns the handler for new connections.
// See peerstream. You will rarely use this. See SetStreamHandler
func (s *Swarm) SetConnHandler(handler ConnHandler) {
// handler is nil if user wants to clear the old handler.
if handler == nil {
s.swarm.SetConnHandler(func(psconn *ps.Conn) {
s.connHandler(psconn)
})
return
}
s.swarm.SetConnHandler(func(psconn *ps.Conn) {
// sc is nil if closed in our handler.
if sc := s.connHandler(psconn); sc != nil {
// call the user's handler. in a goroutine for sync safety.
go handler(sc)
}
})
}
// SetStreamHandler assigns the handler for new streams.
// See peerstream.
func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) {
s.swarm.SetStreamHandler(func(s *ps.Stream) {
handler(wrapStream(s))
})
}
// NewStreamWithPeer creates a new stream on any available connection to p
func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) {
// if we have no connections, try connecting.
if len(s.ConnectionsToPeer(p)) == 0 {
log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...")
if _, err := s.Dial(context.Background(), p); err != nil {
return nil, err
}
}
log.Debug("Swarm: NewStreamWithPeer...")
st, err := s.swarm.NewStreamWithGroup(p)
return wrapStream(st), err
}
// StreamsWithPeer returns all the live Streams to p
func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream {
return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams()))
}
// ConnectionsToPeer returns all the live connections to p
func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn {
return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns()))
}
// Connections returns a slice of all connections.
func (s *Swarm) Connections() []*Conn {
return wrapConns(s.swarm.Conns())
}
// CloseConnection removes a given peer from swarm + closes the connection
func (s *Swarm) CloseConnection(p peer.ID) error {
conns := s.swarm.ConnsWithGroup(p) // boom.
for _, c := range conns {
c.Close()
}
return nil
}
// Peers returns a copy of the set of peers swarm is connected to.
func (s *Swarm) Peers() []peer.ID {
conns := s.Connections()
seen := make(map[peer.ID]struct{})
peers := make([]peer.ID, 0, len(conns))
for _, c := range conns {
p := c.RemotePeer()
if _, found := seen[p]; found {
continue
}
peers = append(peers, p)
}
return peers
}
// LocalPeer returns the local peer swarm is associated to.
func (s *Swarm) LocalPeer() peer.ID {
return s.local
}
package swarm
import (
"fmt"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
conn "github.com/jbenet/go-ipfs/p2p/net/conn"
inet "github.com/jbenet/go-ipfs/p2p/net2"
peer "github.com/jbenet/go-ipfs/p2p/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
)
// a Conn is a simple wrapper around a ps.Conn that also exposes
// some of the methods from the underlying conn.Conn.
// There's **five** "layers" to each connection:
// * 0. the net.Conn - underlying net.Conn (TCP/UDP/UTP/etc)
// * 1. the manet.Conn - provides multiaddr friendly Conn
// * 2. the conn.Conn - provides Peer friendly Conn (inc Secure channel)
// * 3. the peerstream.Conn - provides peerstream / spdysptream happiness
// * 4. the Conn - abstracts everyting out, exposing only key parts of underlying layers
// (I know, this is kinda crazy. it's more historical than a good design. though the
// layers do build up pieces of functionality. and they're all just io.RW :) )
type Conn ps.Conn
// ConnHandler is called when new conns are opened from remote peers.
// See peerstream.ConnHandler
type ConnHandler func(*Conn)
func (c *Conn) StreamConn() *ps.Conn {
return (*ps.Conn)(c)
}
func (c *Conn) RawConn() conn.Conn {
// righly panic if these things aren't true. it is an expected
// invariant that these Conns are all of the typewe expect:
// ps.Conn wrapping a conn.Conn
// if we get something else it is programmer error.
return (*ps.Conn)(c).NetConn().(conn.Conn)
}
func (c *Conn) String() string {
return fmt.Sprintf("<SwarmConn %s>", c.RawConn())
}
// LocalMultiaddr is the Multiaddr on this side
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.RawConn().LocalMultiaddr()
}
// LocalPeer is the Peer on our side of the connection
func (c *Conn) LocalPeer() peer.ID {
return c.RawConn().LocalPeer()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
return c.RawConn().RemoteMultiaddr()
}
// RemotePeer is the Peer on the remote side
func (c *Conn) RemotePeer() peer.ID {
return c.RawConn().RemotePeer()
}
// LocalPrivateKey is the public key of the peer on this side
func (c *Conn) LocalPrivateKey() ic.PrivKey {
return c.RawConn().LocalPrivateKey()
}
// RemotePublicKey is the public key of the peer on the remote side
func (c *Conn) RemotePublicKey() ic.PubKey {
return c.RawConn().RemotePublicKey()
}
// NewSwarmStream returns a new Stream from this connection
func (c *Conn) NewSwarmStream() (*Stream, error) {
s, err := c.StreamConn().NewStream()
return wrapStream(s), err
}
// NewStream returns a new Stream from this connection
func (c *Conn) NewStream() (inet.Stream, error) {
s, err := c.NewSwarmStream()
return inet.Stream(s), err
}
func (c *Conn) Close() error {
return c.StreamConn().Close()
}
func wrapConn(psc *ps.Conn) (*Conn, error) {
// grab the underlying connection.
if _, ok := psc.NetConn().(conn.Conn); !ok {
// this should never happen. if we see it ocurring it means that we added
// a Listener to the ps.Swarm that is NOT one of our net/conn.Listener.
return nil, fmt.Errorf("swarm connHandler: invalid conn (not a conn.Conn): %s", psc)
}
return (*Conn)(psc), nil
}
// wrapConns returns a *Conn for all these ps.Conns
func wrapConns(conns1 []*ps.Conn) []*Conn {
conns2 := make([]*Conn, len(conns1))
for i, c1 := range conns1 {
if c2, err := wrapConn(c1); err == nil {
conns2[i] = c2
}
}
return conns2
}
// newConnSetup does the swarm's "setup" for a connection. returns the underlying
// conn.Conn this method is used by both swarm.Dial and ps.Swarm connHandler
func (s *Swarm) newConnSetup(ctx context.Context, psConn *ps.Conn) (*Conn, error) {
// wrap with a Conn
sc, err := wrapConn(psConn)
if err != nil {
return nil, err
}
// if we have a public key, make sure we add it to our peerstore!
// This is an important detail. Otherwise we must fetch the public
// key from the DHT or some other system.
if pk := sc.RemotePublicKey(); pk != nil {
s.peers.AddPubKey(sc.RemotePeer(), pk)
}
// ok great! we can use it. add it to our group.
// set the RemotePeer as a group on the conn. this lets us group
// connections in the StreamSwarm by peer, and get a streams from
// any available connection in the group (better multiconn):
// swarm.StreamSwarm().NewStreamWithGroup(remotePeer)
psConn.AddGroup(sc.RemotePeer())
return sc, nil
}
package swarm
import (
"errors"
"fmt"
conn "github.com/jbenet/go-ipfs/p2p/net/conn"
peer "github.com/jbenet/go-ipfs/p2p/peer"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
if p == s.local {
return nil, errors.New("Attempted connection to self!")
}
// check if we already have an open connection first
cs := s.ConnectionsToPeer(p)
for _, c := range cs {
if c != nil { // dump out the first one we find
return c, nil
}
}
sk := s.peers.PrivKey(s.local)
if sk == nil {
// may be fine for sk to be nil, just log a warning.
log.Warning("Dial not given PrivateKey, so WILL NOT SECURE conn.")
}
remoteAddrs := s.peers.Addresses(p)
if len(remoteAddrs) == 0 {
return nil, errors.New("peer has no addresses")
}
localAddrs := s.peers.Addresses(s.local)
if len(localAddrs) == 0 {
log.Debug("Dialing out with no local addresses.")
}
// open connection to peer
d := &conn.Dialer{
LocalPeer: s.local,
LocalAddrs: localAddrs,
PrivateKey: sk,
}
// try to connect to one of the peer's known addresses.
// for simplicity, we do this sequentially.
// A future commit will do this asynchronously.
var connC conn.Conn
var err error
for _, addr := range remoteAddrs {
connC, err = d.Dial(ctx, addr, p)
if err == nil {
break
}
}
if err != nil {
return nil, err
}
// ok try to setup the new connection.
swarmC, err := dialConnSetup(ctx, s, connC)
if err != nil {
log.Error("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
swarmC.Close() // close the connection. didn't work out :(
return nil, err
}
log.Event(ctx, "dial", p)
return swarmC, nil
}
// dialConnSetup is the setup logic for a connection from the dial side. it
// needs to add the Conn to the StreamSwarm, then run newConnSetup
func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) {
psC, err := s.swarm.AddConn(connC)
if err != nil {
// connC is closed by caller if we fail.
return nil, fmt.Errorf("failed to add conn to ps.Swarm: %s", err)
}
// ok try to setup the new connection. (newConnSetup will add to group)
swarmC, err := s.newConnSetup(ctx, psC)
if err != nil {
log.Error("Dial newConnSetup failed. disconnecting.")
log.Event(ctx, "dialFailureDisconnect", lgbl.NetConn(connC), lgbl.Error(err))
swarmC.Close() // we need to call this to make sure psC is Closed.
return nil, err
}
return swarmC, err
}
package swarm
import (
conn "github.com/jbenet/go-ipfs/p2p/net/conn"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
multierr "github.com/jbenet/go-ipfs/util/multierr"
)
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
retErr := multierr.New()
// listen on every address
for i, addr := range addrs {
err := s.setupListener(addr)
if err != nil {
if retErr.Errors == nil {
retErr.Errors = make([]error, len(addrs))
}
retErr.Errors[i] = err
log.Errorf("Failed to listen on: %s - %s", addr, err)
}
}
if retErr.Errors != nil {
return retErr
}
return nil
}
// Listen for new connections on the given multiaddr
func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
// TODO rethink how this has to work. (jbenet)
//
// resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
// if err != nil {
// return err
// }
// for _, a := range resolved {
// s.peers.AddAddress(s.local, a)
// }
sk := s.peers.PrivKey(s.local)
if sk == nil {
// may be fine for sk to be nil, just log a warning.
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
if err != nil {
return err
}
// AddListener to the peerstream Listener. this will begin accepting connections
// and streams!
_, err = s.swarm.AddListener(list)
return err
}
// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
ctx := context.Background()
// this context is for running the handshake, which -- when receiveing connections
// -- we have no bound on beyond what the transport protocol bounds it at.
// note that setup + the handshake are bounded by underlying io.
// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)
sc, err := s.newConnSetup(ctx, c)
if err != nil {
log.Error(err)
log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
c.Close() // boom. close it.
return nil
}
return sc
}
package swarm
import (
"fmt"
peer "github.com/jbenet/go-ipfs/p2p/peer"
inet "github.com/jbenet/go-ipfs/p2p/net2"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// Network implements the inet.Network interface.
// It is simply a swarm, with a few different functions
// to implement inet.Network.
type Network Swarm
// NewNetwork constructs a new network and starts listening on given addresses.
func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID,
peers peer.Peerstore) (*Network, error) {
s, err := NewSwarm(ctx, listen, local, peers)
if err != nil {
return nil, err
}
return (*Network)(s), nil
}
// DialPeer attempts to establish a connection to a given peer.
// Respects the context.
func (n *Network) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
log.Debugf("[%s] network dialing peer [%s]", n.local, p)
sc, err := n.Swarm().Dial(ctx, p)
if err != nil {
return nil, err
}
log.Debugf("network for %s finished dialing %s", n.local, p)
return inet.Conn(sc), nil
}
// CtxGroup returns the network's ContextGroup
func (n *Network) CtxGroup() ctxgroup.ContextGroup {
return n.cg
}
// Swarm returns the network's peerstream.Swarm
func (n *Network) Swarm() *Swarm {
return (*Swarm)(n)
}
// LocalPeer the network's LocalPeer
func (n *Network) LocalPeer() peer.ID {
return n.Swarm().LocalPeer()
}
// Peers returns the connected peers
func (n *Network) Peers() []peer.ID {
return n.Swarm().Peers()
}
// Peers returns the connected peers
func (n *Network) Peerstore() peer.Peerstore {
return n.Swarm().peers
}
// Conns returns the connected peers
func (n *Network) Conns() []inet.Conn {
conns1 := n.Swarm().Connections()
out := make([]inet.Conn, len(conns1))
for i, c := range conns1 {
out[i] = inet.Conn(c)
}
return out
}
// ConnsToPeer returns the connections in this Netowrk for given peer.
func (n *Network) ConnsToPeer(p peer.ID) []inet.Conn {
conns1 := n.Swarm().ConnectionsToPeer(p)
out := make([]inet.Conn, len(conns1))
for i, c := range conns1 {
out[i] = inet.Conn(c)
}
return out
}
// ClosePeer connection to peer
func (n *Network) ClosePeer(p peer.ID) error {
return n.Swarm().CloseConnection(p)
}
// close is the real teardown function
func (n *Network) close() error {
return n.Swarm().Close()
}
// Close calls the ContextCloser func
func (n *Network) Close() error {
return n.Swarm().cg.Close()
}
// ListenAddresses returns a list of addresses at which this network listens.
func (n *Network) ListenAddresses() []ma.Multiaddr {
return n.Swarm().ListenAddresses()
}
// InterfaceListenAddresses returns a list of addresses at which this network
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return InterfaceListenAddresses(n.Swarm())
}
// Connectedness returns a state signaling connection capabilities
// For now only returns Connected || NotConnected. Expand into more later.
func (n *Network) Connectedness(p peer.ID) inet.Connectedness {
c := n.Swarm().ConnectionsToPeer(p)
if c != nil && len(c) > 0 {
return inet.Connected
}
return inet.NotConnected
}
// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
func (n *Network) NewStream(p peer.ID) (inet.Stream, error) {
log.Debugf("[%s] network opening stream to peer [%s]", n.local, p)
s, err := n.Swarm().NewStreamWithPeer(p)
if err != nil {
return nil, err
}
return inet.Stream(s), nil
}
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (n *Network) SetStreamHandler(h inet.StreamHandler) {
n.Swarm().SetStreamHandler(h)
}
// SetConnHandler sets the conn handler on the Network.
// This operation is threadsafe.
func (n *Network) SetConnHandler(h inet.ConnHandler) {
n.Swarm().SetConnHandler(func(c *Conn) {
h(inet.Conn(c))
})
}
// String returns a string representation of Network.
func (n *Network) String() string {
return fmt.Sprintf("<Network %s>", n.LocalPeer())
}
package swarm_test
import (
"fmt"
"testing"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inet "github.com/jbenet/go-ipfs/p2p/net"
netutil "github.com/jbenet/go-ipfs/p2p/net/swarmnet/util"
)
// TestConnectednessCorrect starts a few networks, connects a few
// and tests Connectedness value is correct.
func TestConnectednessCorrect(t *testing.T) {
ctx := context.Background()
nets := make([]inet.Network, 4)
for i := 0; i < 4; i++ {
nets[i] = netutil.GenNetwork(t, ctx)
}
// connect 0-1, 0-2, 0-3, 1-2, 2-3
dial := func(a, b inet.Network) {
netutil.DivulgeAddresses(b, a)
if err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
t.Fatalf("Failed to dial: %s", err)
}
}
dial(nets[0], nets[1])
dial(nets[0], nets[3])
dial(nets[1], nets[2])
dial(nets[3], nets[2])
// there's something wrong with dial, i think. it's not finishing
// completely. there must be some async stuff.
<-time.After(100 * time.Millisecond)
// test those connected show up correctly
// test connected
expectConnectedness(t, nets[0], nets[1], inet.Connected)
expectConnectedness(t, nets[0], nets[3], inet.Connected)
expectConnectedness(t, nets[1], nets[2], inet.Connected)
expectConnectedness(t, nets[3], nets[2], inet.Connected)
// test not connected
expectConnectedness(t, nets[0], nets[2], inet.NotConnected)
expectConnectedness(t, nets[1], nets[3], inet.NotConnected)
for _, n := range nets {
n.Close()
}
}
func expectConnectedness(t *testing.T, a, b inet.Network, expected inet.Connectedness) {
es := "%s is connected to %s, but Connectedness incorrect. %s %s"
if a.Connectedness(b.LocalPeer()) != expected {
t.Errorf(es, a, b, printConns(a), printConns(b))
}
// test symmetric case
if b.Connectedness(a.LocalPeer()) != expected {
t.Errorf(es, b, a, printConns(b), printConns(a))
}
}
func printConns(n inet.Network) string {
s := fmt.Sprintf("Connections in %s:\n", n)
for _, c := range n.Conns() {
s = s + fmt.Sprintf("- %s\n", c)
}
return s
}
package swarm
import (
inet "github.com/jbenet/go-ipfs/p2p/net2"
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
)
// a Stream is a wrapper around a ps.Stream that exposes a way to get
// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm)
type Stream ps.Stream
// Stream returns the underlying peerstream.Stream
func (s *Stream) Stream() *ps.Stream {
return (*ps.Stream)(s)
}
// Conn returns the Conn associated with this Stream, as an inet.Conn
func (s *Stream) Conn() inet.Conn {
return s.SwarmConn()
}
// SwarmConn returns the Conn associated with this Stream, as a *Conn
func (s *Stream) SwarmConn() *Conn {
return (*Conn)(s.Stream().Conn())
}
// Wait waits for the stream to receive a reply.
func (s *Stream) Wait() error {
return s.Stream().Wait()
}
// Read reads bytes from a stream.
func (s *Stream) Read(p []byte) (n int, err error) {
return s.Stream().Read(p)
}
// Write writes bytes to a stream, flushing for each call.
func (s *Stream) Write(p []byte) (n int, err error) {
return s.Stream().Write(p)
}
// Close closes the stream, indicating this side is finished
// with the stream.
func (s *Stream) Close() error {
return s.Stream().Close()
}
func wrapStream(pss *ps.Stream) *Stream {
return (*Stream)(pss)
}
func wrapStreams(st []*ps.Stream) []*Stream {
out := make([]*Stream, len(st))
for i, s := range st {
out[i] = wrapStream(s)
}
return out
}
package swarm
import (
"bytes"
"io"
"sync"
"testing"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net2"
peer "github.com/jbenet/go-ipfs/p2p/peer"
errors "github.com/jbenet/go-ipfs/util/debugerror"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func EchoStreamHandler(stream inet.Stream) {
go func() {
defer stream.Close()
// pull out the ipfs conn
c := stream.Conn()
log.Debugf("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
buf := make([]byte, 4)
for {
if _, err := stream.Read(buf); err != nil {
if err != io.EOF {
log.Error("ping receive error:", err)
}
return
}
if !bytes.Equal(buf, []byte("ping")) {
log.Errorf("ping receive error: ping != %s %v", buf, buf)
return
}
if _, err := stream.Write([]byte("pong")); err != nil {
log.Error("pond send error:", err)
return
}
}
}()
}
func makeSwarms(ctx context.Context, t *testing.T, num int) ([]*Swarm, []testutil.PeerNetParams) {
swarms := make([]*Swarm, 0, num)
peersnp := make([]testutil.PeerNetParams, 0, num)
for i := 0; i < num; i++ {
localnp := testutil.RandPeerNetParamsOrFatal(t)
peersnp = append(peersnp, localnp)
peerstore := peer.NewPeerstore()
peerstore.AddAddress(localnp.ID, localnp.Addr)
peerstore.AddPubKey(localnp.ID, localnp.PubKey)
peerstore.AddPrivKey(localnp.ID, localnp.PrivKey)
addrs := peerstore.Addresses(localnp.ID)
swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore)
if err != nil {
t.Fatal(err)
}
swarm.SetStreamHandler(EchoStreamHandler)
swarms = append(swarms, swarm)
}
return swarms, peersnp
}
func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm, peersnp []testutil.PeerNetParams) {
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for _, s := range swarms {
for _, p := range peersnp {
if p.ID != s.local { // don't connect to self.
wg.Add(1)
connect(s, p.ID, p.Addr)
}
}
}
wg.Wait()
for _, s := range swarms {
log.Infof("%s swarm routing table: %s", s.local, s.Peers())
}
}
func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peersnp := makeSwarms(ctx, t, SwarmNum)
// connect everyone
connectSwarms(t, ctx, swarms, peersnp)
// ping/pong
for _, s1 := range swarms {
log.Debugf("-------------------------------------------------------")
log.Debugf("%s ping pong round", s1.local)
log.Debugf("-------------------------------------------------------")
_, cancel := context.WithCancel(ctx)
got := map[peer.ID]int{}
errChan := make(chan error, MsgNum*len(peersnp))
streamChan := make(chan *Stream, MsgNum)
// send out "ping" x MsgNum to every peer
go func() {
defer close(streamChan)
var wg sync.WaitGroup
send := func(p peer.ID) {
defer wg.Done()
// first, one stream per peer (nice)
stream, err := s1.NewStreamWithPeer(p)
if err != nil {
errChan <- errors.Wrap(err)
return
}
// send out ping!
for k := 0; k < MsgNum; k++ { // with k messages
msg := "ping"
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
stream.Write([]byte(msg))
}
// read it later
streamChan <- stream
}
for _, p := range peersnp {
if p.ID == s1.local {
continue // dont send to self...
}
wg.Add(1)
go send(p.ID)
}
wg.Wait()
}()
// receive "pong" x MsgNum from every peer
go func() {
defer close(errChan)
count := 0
countShouldBe := MsgNum * (len(peersnp) - 1)
for stream := range streamChan { // one per peer
defer stream.Close()
// get peer on the other side
p := stream.Conn().RemotePeer()
// receive pings
msgCount := 0
msg := make([]byte, 4)
for k := 0; k < MsgNum; k++ { // with k messages
// read from the stream
if _, err := stream.Read(msg); err != nil {
errChan <- errors.Wrap(err)
continue
}
if string(msg) != "pong" {
errChan <- errors.Errorf("unexpected message: %s", msg)
continue
}
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
msgCount++
}
got[p] = msgCount
count += msgCount
}
if count != countShouldBe {
errChan <- errors.Errorf("count mismatch: %d != %d", count, countShouldBe)
}
}()
// check any errors (blocks till consumer is done)
for err := range errChan {
if err != nil {
t.Fatal(err.Error())
}
}
log.Debugf("%s got pongs", s1.local)
if (len(peersnp) - 1) != len(got) {
t.Errorf("got (%d) less messages than sent (%d).", len(got), len(peersnp))
}
for p, n := range got {
if n != MsgNum {
t.Error("peer did not get all msgs", p, n, "/", MsgNum)
}
}
cancel()
<-time.After(10 * time.Millisecond)
}
for _, s := range swarms {
s.Close()
}
}
func TestSwarm(t *testing.T) {
// t.Skip("skipping for another test")
// msgs := 1000
msgs := 100
swarms := 5
SubtestSwarm(t, swarms, msgs)
}
func TestConnHandler(t *testing.T) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peersnp := makeSwarms(ctx, t, 5)
gotconn := make(chan struct{}, 10)
swarms[0].SetConnHandler(func(conn *Conn) {
gotconn <- struct{}{}
})
connectSwarms(t, ctx, swarms, peersnp)
<-time.After(time.Millisecond)
// should've gotten 5 by now.
swarms[0].SetConnHandler(nil)
expect := 4
for i := 0; i < expect; i++ {
select {
case <-time.After(time.Second):
t.Fatal("failed to get connections")
case <-gotconn:
}
}
select {
case <-gotconn:
t.Fatalf("should have connected to %d swarms", expect)
default:
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment