Commit 44309cd5 authored by Jeromy's avatar Jeromy
Browse files

bandwidth metering on streams

humanize bandwidth output

instrument conn.Conn for bandwidth metrics

add poll command for continuous bandwidth reporting

move bandwidth tracking onto multiaddr net connections

another mild refactor of recording locations

address concerns from PR

lower mock nodes in race test due to increased goroutines per connection
parent ac98d23b
......@@ -4,6 +4,8 @@ import (
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
metrics "github.com/ipfs/go-ipfs/metrics"
mstream "github.com/ipfs/go-ipfs/metrics/stream"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
inet "github.com/ipfs/go-ipfs/p2p/net"
......@@ -41,13 +43,16 @@ type BasicHost struct {
natmgr *natManager
proc goprocess.Process
bwc metrics.Reporter
}
// New constructs and sets up a new *BasicHost with given Network
func New(net inet.Network, opts ...Option) *BasicHost {
func New(net inet.Network, opts ...interface{}) *BasicHost {
h := &BasicHost{
network: net,
mux: protocol.NewMux(),
bwc: metrics.NewBandwidthCounter(),
}
h.proc = goprocess.WithTeardown(func() error {
......@@ -62,16 +67,21 @@ func New(net inet.Network, opts ...Option) *BasicHost {
h.ids = identify.NewIDService(h)
h.relay = relay.NewRelayService(h, h.Mux().HandleSync)
net.SetConnHandler(h.newConnHandler)
net.SetStreamHandler(h.newStreamHandler)
for _, o := range opts {
switch o {
case NATPortMap:
h.natmgr = newNatManager(h)
switch o := o.(type) {
case Option:
switch o {
case NATPortMap:
h.natmgr = newNatManager(h)
}
case metrics.Reporter:
h.bwc = o
}
}
net.SetConnHandler(h.newConnHandler)
net.SetStreamHandler(h.newStreamHandler)
return h
}
......@@ -81,8 +91,17 @@ func (h *BasicHost) newConnHandler(c inet.Conn) {
}
// newStreamHandler is the remote-opened stream handler for inet.Network
// TODO: this feels a bit wonky
func (h *BasicHost) newStreamHandler(s inet.Stream) {
h.Mux().Handle(s)
protoID, handle, err := h.Mux().ReadHeader(s)
if err != nil {
log.Error("protocol mux failed: %s", err)
return
}
logStream := mstream.WrapStream(s, protoID, h.bwc)
go handle(logStream)
}
// ID returns the (local) peer.ID associated with this Host
......@@ -131,12 +150,14 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
return nil, err
}
if err := protocol.WriteHeader(s, pid); err != nil {
s.Close()
logStream := mstream.WrapStream(s, pid, h.bwc)
if err := protocol.WriteHeader(logStream, pid); err != nil {
logStream.Close()
return nil, err
}
return s, nil
return logStream, nil
}
// Connect ensures there is a connection between this host and the peer with
......@@ -210,3 +231,7 @@ func (h *BasicHost) Addrs() []ma.Multiaddr {
func (h *BasicHost) Close() error {
return h.proc.Close()
}
func (h *BasicHost) GetBandwidthReporter() metrics.Reporter {
return h.bwc
}
......@@ -3,6 +3,7 @@ package host
import (
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
metrics "github.com/ipfs/go-ipfs/metrics"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
protocol "github.com/ipfs/go-ipfs/p2p/protocol"
......@@ -57,4 +58,6 @@ type Host interface {
// Close shuts down the host, its Network, and services.
Close() error
GetBandwidthReporter() metrics.Reporter
}
......@@ -9,6 +9,7 @@ import (
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
metrics "github.com/ipfs/go-ipfs/metrics"
host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
......@@ -115,3 +116,7 @@ func (rh *RoutedHost) Close() error {
// no need to close IpfsRouting. we dont own it.
return rh.host.Close()
}
func (rh *RoutedHost) GetBandwidthReporter() metrics.Reporter {
return rh.host.GetBandwidthReporter()
}
......@@ -50,6 +50,10 @@ func (d *Dialer) Dial(ctx context.Context, raddr ma.Multiaddr, remote peer.ID) (
return
}
if d.Wrapper != nil {
maconn = d.Wrapper(maconn)
}
c, err := newSingleConn(ctx, d.LocalPeer, remote, maconn)
if err != nil {
maconn.Close()
......
......@@ -66,6 +66,9 @@ type Dialer struct {
// PrivateKey used to initialize a secure connection.
// Warning: if PrivateKey is nil, connection will not be secured.
PrivateKey ic.PrivKey
// Wrapper to wrap the raw connection (optional)
Wrapper func(manet.Conn) manet.Conn
}
// Listener is an object that can accept connections. It matches net.Listener
......
......@@ -16,6 +16,9 @@ import (
peer "github.com/ipfs/go-ipfs/p2p/peer"
)
// ConnWrapper is any function that wraps a raw multiaddr connection
type ConnWrapper func(manet.Conn) manet.Conn
// listener is an object that can accept connections. It implements Listener
type listener struct {
manet.Listener
......@@ -23,6 +26,8 @@ type listener struct {
local peer.ID // LocalPeer is the identity of the local Peer
privk ic.PrivKey // private key to use to initialize secure conns
wrapper ConnWrapper
cg ctxgroup.ContextGroup
}
......@@ -76,6 +81,11 @@ func (l *listener) Accept() (net.Conn, error) {
}
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
// If we have a wrapper func, wrap this conn
if l.wrapper != nil {
maconn = l.wrapper(maconn)
}
c, err := newSingleConn(ctx, l.local, "", maconn)
if err != nil {
if catcher.IsTemporary(err) {
......@@ -143,6 +153,16 @@ func Listen(ctx context.Context, addr ma.Multiaddr, local peer.ID, sk ic.PrivKey
return l, nil
}
type ListenerConnWrapper interface {
SetConnWrapper(ConnWrapper)
}
// SetConnWrapper assigns a maconn ConnWrapper to wrap all incoming
// connections with. MUST be set _before_ calling `Accept()`
func (l *listener) SetConnWrapper(cw ConnWrapper) {
l.wrapper = cw
}
func manetListen(addr ma.Multiaddr) (manet.Listener, error) {
network, naddr, err := manet.DialArgs(addr)
if err != nil {
......
......@@ -12,6 +12,7 @@ import (
protocol "github.com/ipfs/go-ipfs/p2p/protocol"
testutil "github.com/ipfs/go-ipfs/util/testutil"
detectrace "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-detect-race"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
......@@ -358,8 +359,12 @@ func makePonger(st string) func(inet.Stream) {
}
func TestStreamsStress(t *testing.T) {
nnodes := 100
if detectrace.WithRace() {
nnodes = 50
}
mn, err := FullMeshConnected(context.Background(), 100)
mn, err := FullMeshConnected(context.Background(), nnodes)
if err != nil {
t.Fatal(err)
}
......
......@@ -7,6 +7,7 @@ import (
"sync"
"time"
metrics "github.com/ipfs/go-ipfs/metrics"
inet "github.com/ipfs/go-ipfs/p2p/net"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer"
......@@ -42,12 +43,13 @@ type Swarm struct {
notifmu sync.RWMutex
notifs map[inet.Notifiee]ps.Notifiee
cg ctxgroup.ContextGroup
cg ctxgroup.ContextGroup
bwc metrics.Reporter
}
// NewSwarm constructs a Swarm, with a Chan.
func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
local peer.ID, peers peer.Peerstore) (*Swarm, error) {
local peer.ID, peers peer.Peerstore, bwc metrics.Reporter) (*Swarm, error) {
listenAddrs, err := filterAddrs(listenAddrs)
if err != nil {
......@@ -61,6 +63,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
cg: ctxgroup.WithContext(ctx),
dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc,
}
// configure Swarm
......
......@@ -3,6 +3,7 @@ package swarm
import (
"testing"
metrics "github.com/ipfs/go-ipfs/metrics"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer"
testutil "github.com/ipfs/go-ipfs/util/testutil"
......@@ -65,11 +66,11 @@ func TestFilterAddrs(t *testing.T) {
ps := peer.NewPeerstore()
ctx := context.Background()
if _, err := NewNetwork(ctx, bad, id, ps); err == nil {
if _, err := NewNetwork(ctx, bad, id, ps, metrics.NewBandwidthCounter()); err == nil {
t.Fatal("should have failed to create swarm")
}
if _, err := NewNetwork(ctx, goodAndBad, id, ps); err != nil {
if _, err := NewNetwork(ctx, goodAndBad, id, ps, metrics.NewBandwidthCounter()); err != nil {
t.Fatal("should have succeeded in creating swarm", err)
}
}
......
......@@ -8,6 +8,7 @@ import (
"sync"
"time"
mconn "github.com/ipfs/go-ipfs/metrics/conn"
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer"
......@@ -318,6 +319,9 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
LocalPeer: s.local,
LocalAddrs: localAddrs,
PrivateKey: sk,
Wrapper: func(c manet.Conn) manet.Conn {
return mconn.WrapConn(s.bwc, c)
},
}
// try to get a connection to any addr
......
......@@ -3,12 +3,14 @@ package swarm
import (
"fmt"
mconn "github.com/ipfs/go-ipfs/metrics/conn"
inet "github.com/ipfs/go-ipfs/p2p/net"
conn "github.com/ipfs/go-ipfs/p2p/net/conn"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
multierr "github.com/ipfs/go-ipfs/thirdparty/multierr"
......@@ -67,6 +69,12 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
return err
}
if cw, ok := list.(conn.ListenerConnWrapper); ok {
cw.SetConnWrapper(func(c manet.Conn) manet.Conn {
return mconn.WrapConn(s.bwc, c)
})
}
// AddListener to the peerstream Listener. this will begin accepting connections
// and streams!
sl, err := s.swarm.AddListener(list)
......
......@@ -5,6 +5,7 @@ import (
peer "github.com/ipfs/go-ipfs/p2p/peer"
metrics "github.com/ipfs/go-ipfs/metrics"
inet "github.com/ipfs/go-ipfs/p2p/net"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
......@@ -19,9 +20,9 @@ type Network Swarm
// NewNetwork constructs a new network and starts listening on given addresses.
func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.ID,
peers peer.Peerstore) (*Network, error) {
peers peer.Peerstore, bwc metrics.Reporter) (*Network, error) {
s, err := NewSwarm(ctx, listen, local, peers)
s, err := NewSwarm(ctx, listen, local, peers, bwc)
if err != nil {
return nil, err
}
......
......@@ -7,6 +7,7 @@ import (
"testing"
"time"
metrics "github.com/ipfs/go-ipfs/metrics"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
errors "github.com/ipfs/go-ipfs/util/debugerror"
......@@ -58,7 +59,7 @@ func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm {
peerstore.AddPrivKey(localnp.ID, localnp.PrivKey)
addrs := []ma.Multiaddr{localnp.Addr}
swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore)
swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore, metrics.NewBandwidthCounter())
if err != nil {
t.Fatal(err)
}
......
......@@ -9,6 +9,7 @@ import (
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mstream "github.com/ipfs/go-ipfs/metrics/stream"
host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
......@@ -80,6 +81,8 @@ func (ids *IDService) IdentifyConn(c inet.Conn) {
log.Debugf("error opening initial stream for %s", ID)
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer())
} else {
bwc := ids.Host.GetBandwidthReporter()
s = mstream.WrapStream(s, ID, bwc)
// ok give the response to our handler.
if err := protocol.WriteHeader(s, ID); err != nil {
......@@ -106,6 +109,9 @@ func (ids *IDService) RequestHandler(s inet.Stream) {
defer s.Close()
c := s.Conn()
bwc := ids.Host.GetBandwidthReporter()
s = mstream.WrapStream(s, ID, bwc)
w := ggio.NewDelimitedWriter(s)
mes := pb.Identify{}
ids.populateMessage(&mes, s.Conn())
......
......@@ -45,9 +45,9 @@ func (m *Mux) Protocols() []ID {
return l
}
// readHeader reads the stream and returns the next Handler function
// ReadHeader reads the stream and returns the next Handler function
// according to the muxer encoding.
func (m *Mux) readHeader(s io.Reader) (ID, inet.StreamHandler, error) {
func (m *Mux) ReadHeader(s io.Reader) (ID, inet.StreamHandler, error) {
p, err := ReadHeader(s)
if err != nil {
return "", nil, err
......@@ -110,7 +110,7 @@ func (m *Mux) Handle(s inet.Stream) {
func (m *Mux) HandleSync(s inet.Stream) {
ctx := context.Background()
name, handler, err := m.readHeader(s)
name, handler, err := m.ReadHeader(s)
if err != nil {
err = fmt.Errorf("protocol mux error: %s", err)
log.Event(ctx, "muxError", lgbl.Error(err))
......
......@@ -3,6 +3,7 @@ package testutil
import (
"testing"
metrics "github.com/ipfs/go-ipfs/metrics"
bhost "github.com/ipfs/go-ipfs/p2p/host/basic"
inet "github.com/ipfs/go-ipfs/p2p/net"
swarm "github.com/ipfs/go-ipfs/p2p/net/swarm"
......@@ -18,7 +19,7 @@ func GenSwarmNetwork(t *testing.T, ctx context.Context) *swarm.Network {
ps := peer.NewPeerstore()
ps.AddPubKey(p.ID, p.PubKey)
ps.AddPrivKey(p.ID, p.PrivKey)
n, err := swarm.NewNetwork(ctx, []ma.Multiaddr{p.Addr}, p.ID, ps)
n, err := swarm.NewNetwork(ctx, []ma.Multiaddr{p.Addr}, p.ID, ps, metrics.NewBandwidthCounter())
if err != nil {
t.Fatal(err)
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment