Commit 72df463f authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

net -> p2p/net

The net package is the next to move. It will be massaged
a bit still to fix the Network / "NetworkBackend" conflict.
parent bfcb95d6
package swarm
import (
conn "github.com/jbenet/go-ipfs/p2p/net/conn"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
multierr "github.com/jbenet/go-ipfs/util/multierr"
)
// Open listeners for each network the swarm should listen on
func (s *Swarm) listen(addrs []ma.Multiaddr) error {
retErr := multierr.New()
// listen on every address
for i, addr := range addrs {
err := s.setupListener(addr)
if err != nil {
if retErr.Errors == nil {
retErr.Errors = make([]error, len(addrs))
}
retErr.Errors[i] = err
log.Errorf("Failed to listen on: %s - %s", addr, err)
}
}
if retErr.Errors != nil {
return retErr
}
return nil
}
// Listen for new connections on the given multiaddr
func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
// TODO rethink how this has to work. (jbenet)
//
// resolved, err := resolveUnspecifiedAddresses([]ma.Multiaddr{maddr})
// if err != nil {
// return err
// }
// for _, a := range resolved {
// s.peers.AddAddress(s.local, a)
// }
sk := s.peers.PrivKey(s.local)
if sk == nil {
// may be fine for sk to be nil, just log a warning.
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
}
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
if err != nil {
return err
}
// AddListener to the peerstream Listener. this will begin accepting connections
// and streams!
_, err = s.swarm.AddListener(list)
return err
}
// connHandler is called by the StreamSwarm whenever a new connection is added
// here we configure it slightly. Note that this is sequential, so if anything
// will take a while do it in a goroutine.
// See https://godoc.org/github.com/jbenet/go-peerstream for more information
func (s *Swarm) connHandler(c *ps.Conn) *Conn {
ctx := context.Background()
// this context is for running the handshake, which -- when receiveing connections
// -- we have no bound on beyond what the transport protocol bounds it at.
// note that setup + the handshake are bounded by underlying io.
// (i.e. if TCP or UDP disconnects (or the swarm closes), we're done.
// Q: why not have a shorter handshake? think about an HTTP server on really slow conns.
// as long as the conn is live (TCP says its online), it tries its best. we follow suit.)
sc, err := s.newConnSetup(ctx, c)
if err != nil {
log.Error(err)
log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err))
c.Close() // boom. close it.
return nil
}
return sc
}
package swarm
import (
ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
)
// a Stream is a wrapper around a ps.Stream that exposes a way to get
// our Conn and Swarm (instead of just the ps.Conn and ps.Swarm)
type Stream ps.Stream
// StreamHandler is called when new streams are opened from remote peers.
// See peerstream.StreamHandler
type StreamHandler func(*Stream)
// Stream returns the underlying peerstream.Stream
func (s *Stream) Stream() *ps.Stream {
return (*ps.Stream)(s)
}
// Conn returns the Conn associated with this Stream
func (s *Stream) Conn() *Conn {
return (*Conn)(s.Stream().Conn())
}
// Read reads bytes from a stream.
func (s *Stream) Read(p []byte) (n int, err error) {
return s.Stream().Read(p)
}
// Write writes bytes to a stream, flushing for each call.
func (s *Stream) Write(p []byte) (n int, err error) {
return s.Stream().Write(p)
}
// Close closes the stream, indicating this side is finished
// with the stream.
func (s *Stream) Close() error {
return s.Stream().Close()
}
func wrapStream(pss *ps.Stream) *Stream {
return (*Stream)(pss)
}
func wrapStreams(st []*ps.Stream) []*Stream {
out := make([]*Stream, len(st))
for i, s := range st {
out[i] = wrapStream(s)
}
return out
}
package swarm
import (
"bytes"
"io"
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/p2p/peer"
errors "github.com/jbenet/go-ipfs/util/debugerror"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func EchoStreamHandler(stream *Stream) {
go func() {
defer stream.Close()
// pull out the ipfs conn
c := stream.Conn().RawConn()
log.Debugf("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
buf := make([]byte, 4)
for {
if _, err := stream.Read(buf); err != nil {
if err != io.EOF {
log.Error("ping receive error:", err)
}
return
}
if !bytes.Equal(buf, []byte("ping")) {
log.Errorf("ping receive error: ping != %s %v", buf, buf)
return
}
if _, err := stream.Write([]byte("pong")); err != nil {
log.Error("pond send error:", err)
return
}
}
}()
}
func makeSwarms(ctx context.Context, t *testing.T, num int) ([]*Swarm, []testutil.PeerNetParams) {
swarms := make([]*Swarm, 0, num)
peersnp := make([]testutil.PeerNetParams, 0, num)
for i := 0; i < num; i++ {
localnp := testutil.RandPeerNetParamsOrFatal(t)
peersnp = append(peersnp, localnp)
peerstore := peer.NewPeerstore()
peerstore.AddAddress(localnp.ID, localnp.Addr)
peerstore.AddPubKey(localnp.ID, localnp.PubKey)
peerstore.AddPrivKey(localnp.ID, localnp.PrivKey)
addrs := peerstore.Addresses(localnp.ID)
swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore)
if err != nil {
t.Fatal(err)
}
swarm.SetStreamHandler(EchoStreamHandler)
swarms = append(swarms, swarm)
}
return swarms, peersnp
}
func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm, peersnp []testutil.PeerNetParams) {
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}
log.Info("Connecting swarms simultaneously.")
for _, s := range swarms {
for _, p := range peersnp {
if p.ID != s.local { // don't connect to self.
wg.Add(1)
connect(s, p.ID, p.Addr)
}
}
}
wg.Wait()
for _, s := range swarms {
log.Infof("%s swarm routing table: %s", s.local, s.Peers())
}
}
func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peersnp := makeSwarms(ctx, t, SwarmNum)
// connect everyone
connectSwarms(t, ctx, swarms, peersnp)
// ping/pong
for _, s1 := range swarms {
log.Debugf("-------------------------------------------------------")
log.Debugf("%s ping pong round", s1.local)
log.Debugf("-------------------------------------------------------")
_, cancel := context.WithCancel(ctx)
got := map[peer.ID]int{}
errChan := make(chan error, MsgNum*len(peersnp))
streamChan := make(chan *Stream, MsgNum)
// send out "ping" x MsgNum to every peer
go func() {
defer close(streamChan)
var wg sync.WaitGroup
send := func(p peer.ID) {
defer wg.Done()
// first, one stream per peer (nice)
stream, err := s1.NewStreamWithPeer(p)
if err != nil {
errChan <- errors.Wrap(err)
return
}
// send out ping!
for k := 0; k < MsgNum; k++ { // with k messages
msg := "ping"
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
if _, err := stream.Write([]byte(msg)); err != nil {
errChan <- err
continue
}
}
// read it later
streamChan <- stream
}
for _, p := range peersnp {
if p.ID == s1.local {
continue // dont send to self...
}
wg.Add(1)
go send(p.ID)
}
wg.Wait()
}()
// receive "pong" x MsgNum from every peer
go func() {
defer close(errChan)
count := 0
countShouldBe := MsgNum * (len(peersnp) - 1)
for stream := range streamChan { // one per peer
defer stream.Close()
// get peer on the other side
p := stream.Conn().RemotePeer()
// receive pings
msgCount := 0
msg := make([]byte, 4)
for k := 0; k < MsgNum; k++ { // with k messages
// read from the stream
if _, err := stream.Read(msg); err != nil {
errChan <- errors.Wrap(err)
continue
}
if string(msg) != "pong" {
errChan <- errors.Errorf("unexpected message: %s", msg)
continue
}
log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
msgCount++
}
got[p] = msgCount
count += msgCount
}
if count != countShouldBe {
errChan <- errors.Errorf("count mismatch: %d != %d", count, countShouldBe)
}
}()
// check any errors (blocks till consumer is done)
for err := range errChan {
if err != nil {
t.Error(err.Error())
}
}
log.Debugf("%s got pongs", s1.local)
if (len(peersnp) - 1) != len(got) {
t.Errorf("got (%d) less messages than sent (%d).", len(got), len(peersnp))
}
for p, n := range got {
if n != MsgNum {
t.Error("peer did not get all msgs", p, n, "/", MsgNum)
}
}
cancel()
<-time.After(10 * time.Millisecond)
}
for _, s := range swarms {
s.Close()
}
}
func TestSwarm(t *testing.T) {
// t.Skip("skipping for another test")
// msgs := 1000
msgs := 100
swarms := 5
SubtestSwarm(t, swarms, msgs)
}
func TestConnHandler(t *testing.T) {
// t.Skip("skipping for another test")
ctx := context.Background()
swarms, peersnp := makeSwarms(ctx, t, 5)
gotconn := make(chan struct{}, 10)
swarms[0].SetConnHandler(func(conn *Conn) {
gotconn <- struct{}{}
})
connectSwarms(t, ctx, swarms, peersnp)
<-time.After(time.Millisecond)
// should've gotten 5 by now.
swarms[0].SetConnHandler(nil)
expect := 4
for i := 0; i < expect; i++ {
select {
case <-time.After(time.Second):
t.Fatal("failed to get connections")
case <-gotconn:
}
}
select {
case <-gotconn:
t.Fatalf("should have connected to %d swarms", expect)
default:
}
}
// Package net provides an interface for ipfs to interact with the network through
package net
import (
"fmt"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
peer "github.com/jbenet/go-ipfs/p2p/peer"
inet "github.com/jbenet/go-ipfs/p2p/net"
ids "github.com/jbenet/go-ipfs/p2p/net/services/identify"
mux "github.com/jbenet/go-ipfs/p2p/net/services/mux"
relay "github.com/jbenet/go-ipfs/p2p/net/services/relay"
swarm "github.com/jbenet/go-ipfs/p2p/net/swarm"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = eventlog.Logger("net/mux")
type stream swarm.Stream
func (s *stream) SwarmStream() *swarm.Stream {
return (*swarm.Stream)(s)
}
// Conn returns the connection this stream is part of.
func (s *stream) Conn() inet.Conn {
c := s.SwarmStream().Conn()
return (*conn_)(c)
}
// Conn returns the connection this stream is part of.
func (s *stream) Close() error {
return s.SwarmStream().Close()
}
// Read reads bytes from a stream.
func (s *stream) Read(p []byte) (n int, err error) {
return s.SwarmStream().Read(p)
}
// Write writes bytes to a stream, flushing for each call.
func (s *stream) Write(p []byte) (n int, err error) {
return s.SwarmStream().Write(p)
}
type conn_ swarm.Conn
func (s *conn_) String() string {
return s.SwarmConn().String()
}
func (c *conn_) SwarmConn() *swarm.Conn {
return (*swarm.Conn)(c)
}
func (c *conn_) NewStreamWithProtocol(pr inet.ProtocolID) (inet.Stream, error) {
s, err := (*swarm.Conn)(c).NewStream()
if err != nil {
return nil, err
}
ss := (*stream)(s)
if err := mux.WriteProtocolHeader(pr, ss); err != nil {
ss.Close()
return nil, err
}
return ss, nil
}
func (c *conn_) LocalMultiaddr() ma.Multiaddr {
return c.SwarmConn().LocalMultiaddr()
}
func (c *conn_) RemoteMultiaddr() ma.Multiaddr {
return c.SwarmConn().RemoteMultiaddr()
}
func (c *conn_) LocalPeer() peer.ID {
return c.SwarmConn().LocalPeer()
}
func (c *conn_) RemotePeer() peer.ID {
return c.SwarmConn().RemotePeer()
}
func (c *conn_) LocalPrivateKey() ic.PrivKey {
return c.SwarmConn().LocalPrivateKey()
}
func (c *conn_) RemotePublicKey() ic.PubKey {
return c.SwarmConn().RemotePublicKey()
}
// Network implements the inet.Network interface.
// It uses a swarm to connect to remote hosts.
type Network struct {
local peer.ID // local peer
ps peer.Peerstore
swarm *swarm.Swarm // peer connection multiplexing
mux mux.Mux // protocol multiplexing
ids *ids.IDService
relay *relay.RelayService
cg ctxgroup.ContextGroup // for Context closing
}
// NewNetwork constructs a new network and starts listening on given addresses.
func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID,
peers peer.Peerstore) (*Network, error) {
s, err := swarm.NewSwarm(ctx, listen, local, peers)
if err != nil {
return nil, err
}
n := &Network{
local: local,
swarm: s,
mux: mux.Mux{Handlers: inet.StreamHandlerMap{}},
cg: ctxgroup.WithContext(ctx),
ps: peers,
}
n.cg.SetTeardown(n.close)
n.cg.AddChildGroup(s.CtxGroup())
s.SetStreamHandler(func(s *swarm.Stream) {
n.mux.Handle((*stream)(s))
})
// setup ProtocolIdentify to immediately "asks the other side about them"
n.ids = ids.NewIDService(n)
s.SetConnHandler(n.newConnHandler)
// setup ProtocolRelay to allow traffic relaying.
// Feed things we get for ourselves into the muxer.
n.relay = relay.NewRelayService(n.cg.Context(), n, n.mux.HandleSync)
return n, nil
}
func (n *Network) newConnHandler(c *swarm.Conn) {
cc := (*conn_)(c)
n.ids.IdentifyConn(cc)
}
// DialPeer attempts to establish a connection to a given peer.
// Respects the context.
func (n *Network) DialPeer(ctx context.Context, p peer.ID) error {
log.Debugf("[%s] network dialing peer [%s]", n.local, p)
sc, err := n.swarm.Dial(ctx, p)
if err != nil {
return err
}
// identify the connection before returning.
done := make(chan struct{})
go func() {
n.ids.IdentifyConn((*conn_)(sc))
close(done)
}()
// respect don contexteone
select {
case <-done:
case <-ctx.Done():
return ctx.Err()
}
log.Debugf("network for %s finished dialing %s", n.local, p)
return nil
}
// Protocols returns the ProtocolIDs of all the registered handlers.
func (n *Network) Protocols() []inet.ProtocolID {
return n.mux.Protocols()
}
// CtxGroup returns the network's ContextGroup
func (n *Network) CtxGroup() ctxgroup.ContextGroup {
return n.cg
}
// Swarm returns the network's peerstream.Swarm
func (n *Network) Swarm() *swarm.Swarm {
return n.Swarm()
}
// LocalPeer the network's LocalPeer
func (n *Network) LocalPeer() peer.ID {
return n.swarm.LocalPeer()
}
// Peers returns the connected peers
func (n *Network) Peers() []peer.ID {
return n.swarm.Peers()
}
// Peers returns the connected peers
func (n *Network) Peerstore() peer.Peerstore {
return n.ps
}
// Conns returns the connected peers
func (n *Network) Conns() []inet.Conn {
conns1 := n.swarm.Connections()
out := make([]inet.Conn, len(conns1))
for i, c := range conns1 {
out[i] = (*conn_)(c)
}
return out
}
// ConnsToPeer returns the connections in this Netowrk for given peer.
func (n *Network) ConnsToPeer(p peer.ID) []inet.Conn {
conns1 := n.swarm.ConnectionsToPeer(p)
out := make([]inet.Conn, len(conns1))
for i, c := range conns1 {
out[i] = (*conn_)(c)
}
return out
}
// ClosePeer connection to peer
func (n *Network) ClosePeer(p peer.ID) error {
return n.swarm.CloseConnection(p)
}
// close is the real teardown function
func (n *Network) close() error {
return n.swarm.Close()
}
// Close calls the ContextCloser func
func (n *Network) Close() error {
return n.cg.Close()
}
// BandwidthTotals returns the total amount of bandwidth transferred
func (n *Network) BandwidthTotals() (in uint64, out uint64) {
// need to implement this. probably best to do it in swarm this time.
// need a "metrics" object
return 0, 0
}
// ListenAddresses returns a list of addresses at which this network listens.
func (n *Network) ListenAddresses() []ma.Multiaddr {
return n.swarm.ListenAddresses()
}
// InterfaceListenAddresses returns a list of addresses at which this network
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func (n *Network) InterfaceListenAddresses() ([]ma.Multiaddr, error) {
return swarm.InterfaceListenAddresses(n.swarm)
}
// Connectedness returns a state signaling connection capabilities
// For now only returns Connected || NotConnected. Expand into more later.
func (n *Network) Connectedness(p peer.ID) inet.Connectedness {
c := n.swarm.ConnectionsToPeer(p)
if c != nil && len(c) > 0 {
return inet.Connected
}
return inet.NotConnected
}
// NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one.
// If ProtocolID is "", writes no header.
func (n *Network) NewStream(pr inet.ProtocolID, p peer.ID) (inet.Stream, error) {
log.Debugf("[%s] network opening stream to peer [%s]: %s", n.local, p, pr)
s, err := n.swarm.NewStreamWithPeer(p)
if err != nil {
return nil, err
}
ss := (*stream)(s)
if err := mux.WriteProtocolHeader(pr, ss); err != nil {
ss.Close()
return nil, err
}
return ss, nil
}
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (n *Network) SetHandler(p inet.ProtocolID, h inet.StreamHandler) {
n.mux.SetHandler(p, h)
}
// String returns a string representation of Network.
func (n *Network) String() string {
return fmt.Sprintf("<Network %s>", n.LocalPeer())
}
// IdentifyProtocol returns the network's IDService
func (n *Network) IdentifyProtocol() *ids.IDService {
return n.ids
}
package net_test
import (
"fmt"
"testing"
"time"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inet "github.com/jbenet/go-ipfs/p2p/net"
netutil "github.com/jbenet/go-ipfs/p2p/net/swarmnet/util"
)
// TestConnectednessCorrect starts a few networks, connects a few
// and tests Connectedness value is correct.
func TestConnectednessCorrect(t *testing.T) {
ctx := context.Background()
nets := make([]inet.Network, 4)
for i := 0; i < 4; i++ {
nets[i] = netutil.GenNetwork(t, ctx)
}
// connect 0-1, 0-2, 0-3, 1-2, 2-3
dial := func(a, b inet.Network) {
netutil.DivulgeAddresses(b, a)
if err := a.DialPeer(ctx, b.LocalPeer()); err != nil {
t.Fatalf("Failed to dial: %s", err)
}
}
dial(nets[0], nets[1])
dial(nets[0], nets[3])
dial(nets[1], nets[2])
dial(nets[3], nets[2])
// there's something wrong with dial, i think. it's not finishing
// completely. there must be some async stuff.
<-time.After(100 * time.Millisecond)
// test those connected show up correctly
// test connected
expectConnectedness(t, nets[0], nets[1], inet.Connected)
expectConnectedness(t, nets[0], nets[3], inet.Connected)
expectConnectedness(t, nets[1], nets[2], inet.Connected)
expectConnectedness(t, nets[3], nets[2], inet.Connected)
// test not connected
expectConnectedness(t, nets[0], nets[2], inet.NotConnected)
expectConnectedness(t, nets[1], nets[3], inet.NotConnected)
for _, n := range nets {
n.Close()
}
}
func expectConnectedness(t *testing.T, a, b inet.Network, expected inet.Connectedness) {
es := "%s is connected to %s, but Connectedness incorrect. %s %s"
if a.Connectedness(b.LocalPeer()) != expected {
t.Errorf(es, a, b, printConns(a), printConns(b))
}
// test symmetric case
if b.Connectedness(a.LocalPeer()) != expected {
t.Errorf(es, b, a, printConns(b), printConns(a))
}
}
func printConns(n inet.Network) string {
s := fmt.Sprintf("Connections in %s:\n", n)
for _, c := range n.Conns() {
s = s + fmt.Sprintf("- %s\n", c)
}
return s
}
package testutil
import (
"testing"
inet "github.com/jbenet/go-ipfs/p2p/net"
sn "github.com/jbenet/go-ipfs/p2p/net/swarmnet"
peer "github.com/jbenet/go-ipfs/p2p/peer"
tu "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func GenNetwork(t *testing.T, ctx context.Context) *sn.Network {
p := tu.RandPeerNetParamsOrFatal(t)
ps := peer.NewPeerstore()
ps.AddAddress(p.ID, p.Addr)
ps.AddPubKey(p.ID, p.PubKey)
ps.AddPrivKey(p.ID, p.PrivKey)
n, err := sn.NewNetwork(ctx, ps.Addresses(p.ID), p.ID, ps)
if err != nil {
t.Fatal(err)
}
return n
}
func DivulgeAddresses(a, b inet.Network) {
id := a.LocalPeer()
addrs := a.Peerstore().Addresses(id)
b.Peerstore().AddAddresses(id, addrs)
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment