Commit 08c8d6f7 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub
Browse files

Merge pull request #197 from libp2p/feat/refactor-basichost-new

basichost: refactor BasicHost construction
parents 78dabda4 f0453a24
...@@ -21,23 +21,31 @@ import ( ...@@ -21,23 +21,31 @@ import (
var log = logging.Logger("basichost") var log = logging.Logger("basichost")
var NegotiateTimeout = time.Second * 60 var (
// DefaultNegotiationTimeout is the default value for HostOpts.NegotiationTimeout.
DefaultNegotiationTimeout = time.Second * 60
// DefaultAddrsFactory is the default value for HostOpts.AddrsFactory.
DefaultAddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs }
)
// AddrsFactory functions can be passed to New in order to override // AddrsFactory functions can be passed to New in order to override
// addresses returned by Addrs. // addresses returned by Addrs.
type AddrsFactory func([]ma.Multiaddr) []ma.Multiaddr type AddrsFactory func([]ma.Multiaddr) []ma.Multiaddr
// Option is a type used to pass in options to the host. // Option is a type used to pass in options to the host.
//
// Deprecated in favor of HostOpts and NewHost.
type Option int type Option int
const ( // NATPortMap makes the host attempt to open port-mapping in NAT devices
// NATPortMap makes the host attempt to open port-mapping in NAT devices // for all its listeners. Pass in this option in the constructor to
// for all its listeners. Pass in this option in the constructor to // asynchronously a) find a gateway, b) open port mappings, c) republish
// asynchronously a) find a gateway, b) open port mappings, c) republish // port mappings periodically. The NATed addresses are included in the
// port mappings periodically. The NATed addresses are included in the // Host's Addrs() list.
// Host's Addrs() list. //
NATPortMap Option = iota // This option is deprecated in favor of HostOpts and NewHost.
) const NATPortMap Option = iota
// BasicHost is the basic implementation of the host.Host interface. This // BasicHost is the basic implementation of the host.Host interface. This
// particular host implementation: // particular host implementation:
...@@ -51,55 +59,116 @@ type BasicHost struct { ...@@ -51,55 +59,116 @@ type BasicHost struct {
natmgr *natManager natmgr *natManager
addrs AddrsFactory addrs AddrsFactory
NegotiateTimeout time.Duration negtimeout time.Duration
proc goprocess.Process proc goprocess.Process
bwc metrics.Reporter bwc metrics.Reporter
} }
// New constructs and sets up a new *BasicHost with given Network // HostOpts holds options that can be passed to NewHost in order to
func New(net inet.Network, opts ...interface{}) *BasicHost { // customize construction of the *BasicHost.
type HostOpts struct {
// MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted.
MultistreamMuxer *msmux.MultistreamMuxer
// NegotiationTimeout determines the read and write timeouts on streams.
// If 0 or omitted, it will use DefaultNegotiationTimeout.
// If below 0, timeouts on streams will be deactivated.
NegotiationTimeout time.Duration
// IdentifyService holds an implementation of the /ipfs/id/ protocol.
// If omitted, a new *identify.IDService will be used.
IdentifyService *identify.IDService
// AddrsFactory holds a function which can be used to override or filter the result of Addrs.
// If omitted, there's no override or filtering, and the results of Addrs and AllAddrs are the same.
AddrsFactory AddrsFactory
// NATManager takes care of setting NAT port mappings, and discovering external addresses.
// If omitted, this will simply be disabled.
//
// TODO: Currently the NATManager can only be enabled by calling New,
// since the underlying struct and functions are still private.
// Once they are public, NATManager can be used through NewHost as well.
NATManager *natManager
//
BandwidthReporter metrics.Reporter
}
// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
func NewHost(net inet.Network, opts *HostOpts) *BasicHost {
h := &BasicHost{ h := &BasicHost{
network: net, network: net,
mux: msmux.NewMultistreamMuxer(), mux: msmux.NewMultistreamMuxer(),
NegotiateTimeout: NegotiateTimeout, negtimeout: DefaultNegotiationTimeout,
addrs: DefaultAddrsFactory,
}
if opts.MultistreamMuxer != nil {
h.mux = opts.MultistreamMuxer
}
if opts.IdentifyService != nil {
h.ids = opts.IdentifyService
} else {
// we can't set this as a default above because it depends on the *BasicHost.
h.ids = identify.NewIDService(h)
}
if uint64(opts.NegotiationTimeout) != 0 {
h.negtimeout = opts.NegotiationTimeout
}
if opts.AddrsFactory != nil {
h.addrs = opts.AddrsFactory
}
if opts.NATManager != nil {
h.natmgr = opts.NATManager
}
if opts.BandwidthReporter != nil {
h.bwc = opts.BandwidthReporter
h.ids.Reporter = opts.BandwidthReporter
} }
h.proc = goprocess.WithTeardown(func() error { h.proc = goprocess.WithTeardown(func() error {
if h.natmgr != nil { if h.natmgr != nil {
h.natmgr.Close() h.natmgr.Close()
} }
return h.Network().Close() return h.Network().Close()
}) })
// setup host services net.SetConnHandler(h.newConnHandler)
h.ids = identify.NewIDService(h) net.SetStreamHandler(h.newStreamHandler)
return h
}
// default addresses factory, can be overridden via opts argument // New constructs and sets up a new *BasicHost with given Network and options.
h.addrs = func(addrs []ma.Multiaddr) []ma.Multiaddr { return addrs } // Three options can be passed: NATPortMap, AddrsFactory, and metrics.Reporter.
// This function is deprecated in favor of NewHost and HostOpts.
func New(net inet.Network, opts ...interface{}) *BasicHost {
hostopts := &HostOpts{}
for _, o := range opts { for _, o := range opts {
switch o := o.(type) { switch o := o.(type) {
case Option: case Option:
switch o { switch o {
case NATPortMap: case NATPortMap:
h.natmgr = newNatManager(h) hostopts.NATManager = newNatManager(net)
} }
case metrics.Reporter: case metrics.Reporter:
h.bwc = o hostopts.BandwidthReporter = o
case AddrsFactory: case AddrsFactory:
h.addrs = AddrsFactory(o) hostopts.AddrsFactory = AddrsFactory(o)
} }
} }
h.ids.Reporter = h.bwc return NewHost(net, hostopts)
net.SetConnHandler(h.newConnHandler)
net.SetStreamHandler(h.newStreamHandler)
return h
} }
// newConnHandler is the remote-opened conn handler for inet.Network // newConnHandler is the remote-opened conn handler for inet.Network
...@@ -115,8 +184,8 @@ func (h *BasicHost) newConnHandler(c inet.Conn) { ...@@ -115,8 +184,8 @@ func (h *BasicHost) newConnHandler(c inet.Conn) {
func (h *BasicHost) newStreamHandler(s inet.Stream) { func (h *BasicHost) newStreamHandler(s inet.Stream) {
before := time.Now() before := time.Now()
if h.NegotiateTimeout != 0 { if h.negtimeout > 0 {
if err := s.SetDeadline(time.Now().Add(h.NegotiateTimeout)); err != nil { if err := s.SetDeadline(time.Now().Add(h.negtimeout)); err != nil {
log.Error("setting stream deadline: ", err) log.Error("setting stream deadline: ", err)
s.Close() s.Close()
return return
...@@ -144,7 +213,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) { ...@@ -144,7 +213,7 @@ func (h *BasicHost) newStreamHandler(s inet.Stream) {
rw: lzc, rw: lzc,
} }
if h.NegotiateTimeout != 0 { if h.negtimeout > 0 {
if err := s.SetDeadline(time.Time{}); err != nil { if err := s.SetDeadline(time.Time{}); err != nil {
log.Error("resetting stream deadline: ", err) log.Error("resetting stream deadline: ", err)
s.Close() s.Close()
......
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
// as the network signals Listen() or ListenClose(). // as the network signals Listen() or ListenClose().
// * closing the natManager closes the nat and its mappings. // * closing the natManager closes the nat and its mappings.
type natManager struct { type natManager struct {
host *BasicHost net inet.Network
natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.) natmu sync.RWMutex // guards nat (ready could obviate this mutex, but safety first.)
nat *inat.NAT nat *inat.NAT
...@@ -27,23 +27,18 @@ type natManager struct { ...@@ -27,23 +27,18 @@ type natManager struct {
proc goprocess.Process // natManager has a process + children. can be closed. proc goprocess.Process // natManager has a process + children. can be closed.
} }
func newNatManager(host *BasicHost) *natManager { func newNatManager(net inet.Network) *natManager {
nmgr := &natManager{ nmgr := &natManager{
host: host, net: net,
ready: make(chan struct{}), ready: make(chan struct{}),
proc: goprocess.WithParent(host.proc),
} }
// teardown
nmgr.proc = goprocess.WithTeardown(func() error { nmgr.proc = goprocess.WithTeardown(func() error {
// on closing, unregister from network notifications. // on closing, unregister from network notifications.
host.Network().StopNotify((*nmgrNetNotifiee)(nmgr)) net.StopNotify((*nmgrNetNotifiee)(nmgr))
return nil return nil
}) })
// host is our parent. close when host closes.
host.proc.AddChild(nmgr.proc)
// discover the nat. // discover the nat.
nmgr.discoverNAT() nmgr.discoverNAT()
return nmgr return nmgr
...@@ -108,13 +103,13 @@ func (nmgr *natManager) discoverNAT() { ...@@ -108,13 +103,13 @@ func (nmgr *natManager) discoverNAT() {
// sign natManager up for network notifications // sign natManager up for network notifications
// we need to sign up here to avoid missing some notifs // we need to sign up here to avoid missing some notifs
// before the NAT has been found. // before the NAT has been found.
nmgr.host.Network().Notify((*nmgrNetNotifiee)(nmgr)) nmgr.net.Notify((*nmgrNetNotifiee)(nmgr))
// if any interfaces were brought up while we were setting up // if any interfaces were brought up while we were setting up
// the nat, now is the time to setup port mappings for them. // the nat, now is the time to setup port mappings for them.
// we release ready, then grab them to avoid losing any. adding // we release ready, then grab them to avoid losing any. adding
// a port mapping is idempotent, so its ok to add the same twice. // a port mapping is idempotent, so its ok to add the same twice.
addrs := nmgr.host.Network().ListenAddresses() addrs := nmgr.net.ListenAddresses()
for _, addr := range addrs { for _, addr := range addrs {
// we do it async because it's slow and we may want to close beforehand // we do it async because it's slow and we may want to close beforehand
go addPortMapping(nmgr, addr) go addPortMapping(nmgr, addr)
......
...@@ -84,8 +84,10 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps pstore.Peerstore) (host.Ho ...@@ -84,8 +84,10 @@ func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps pstore.Peerstore) (host.Ho
return nil, err return nil, err
} }
h := bhost.New(n) opts := &bhost.HostOpts{
h.NegotiateTimeout = 0 NegotiationTimeout: -1,
}
h := bhost.NewHost(n, opts)
mn.proc.AddChild(n.proc) mn.proc.AddChild(n.proc)
......
...@@ -55,6 +55,8 @@ type IDService struct { ...@@ -55,6 +55,8 @@ type IDService struct {
observedAddrs ObservedAddrSet observedAddrs ObservedAddrSet
} }
// NewIDService constructs a new *IDService and activates it by
// attaching its stream handler to the given host.Host.
func NewIDService(h host.Host) *IDService { func NewIDService(h host.Host) *IDService {
s := &IDService{ s := &IDService{
Host: h, Host: h,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment