Commit 93655b42 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

p2p/net: notify on listens

Network now signals when it successfully listens on some address
or when an address shuts down. This will be used to establish and
close nat port mappings. It could also be used to notify peers
of address changes.
parent 320d0609
...@@ -142,6 +142,8 @@ const ( ...@@ -142,6 +142,8 @@ const (
// Notifiee is an interface for an object wishing to receive // Notifiee is an interface for an object wishing to receive
// notifications from a Network. // notifications from a Network.
type Notifiee interface { type Notifiee interface {
Listen(Network, ma.Multiaddr) // called when network starts listening on an addr
ListenClose(Network, ma.Multiaddr) // called when network starts listening on an addr
Connected(Network, Conn) // called when a connection opened Connected(Network, Conn) // called when a connection opened
Disconnected(Network, Conn) // called when a connection closed Disconnected(Network, Conn) // called when a connection closed
OpenedStream(Network, Stream) // called when a stream opened OpenedStream(Network, Stream) // called when a stream opened
......
...@@ -4,9 +4,10 @@ import ( ...@@ -4,9 +4,10 @@ import (
"testing" "testing"
"time" "time"
inet "github.com/jbenet/go-ipfs/p2p/net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
inet "github.com/jbenet/go-ipfs/p2p/net"
) )
func TestNotifications(t *testing.T) { func TestNotifications(t *testing.T) {
...@@ -169,6 +170,8 @@ func TestNotifications(t *testing.T) { ...@@ -169,6 +170,8 @@ func TestNotifications(t *testing.T) {
} }
type netNotifiee struct { type netNotifiee struct {
listen chan ma.Multiaddr
listenClose chan ma.Multiaddr
connected chan inet.Conn connected chan inet.Conn
disconnected chan inet.Conn disconnected chan inet.Conn
openedStream chan inet.Stream openedStream chan inet.Stream
...@@ -177,6 +180,8 @@ type netNotifiee struct { ...@@ -177,6 +180,8 @@ type netNotifiee struct {
func newNetNotifiee() *netNotifiee { func newNetNotifiee() *netNotifiee {
return &netNotifiee{ return &netNotifiee{
listen: make(chan ma.Multiaddr),
listenClose: make(chan ma.Multiaddr),
connected: make(chan inet.Conn), connected: make(chan inet.Conn),
disconnected: make(chan inet.Conn), disconnected: make(chan inet.Conn),
openedStream: make(chan inet.Stream), openedStream: make(chan inet.Stream),
...@@ -184,6 +189,12 @@ func newNetNotifiee() *netNotifiee { ...@@ -184,6 +189,12 @@ func newNetNotifiee() *netNotifiee {
} }
} }
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {
nn.listen <- a
}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {
nn.listenClose <- a
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.connected <- v nn.connected <- v
} }
......
...@@ -202,6 +202,15 @@ func (s *Swarm) LocalPeer() peer.ID { ...@@ -202,6 +202,15 @@ func (s *Swarm) LocalPeer() peer.ID {
return s.local return s.local
} }
// notifyAll sends a signal to all Notifiees
func (s *Swarm) notifyAll(notify func(inet.Notifiee)) {
s.notifmu.RLock()
for f := range s.notifs {
go notify(f)
}
s.notifmu.RUnlock()
}
// Notify signs up Notifiee to receive signals when events happen // Notify signs up Notifiee to receive signals when events happen
func (s *Swarm) Notify(f inet.Notifiee) { func (s *Swarm) Notify(f inet.Notifiee) {
// wrap with our notifiee, to translate function calls // wrap with our notifiee, to translate function calls
......
...@@ -382,7 +382,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote ...@@ -382,7 +382,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, d *conn.Dialer, p peer.ID, remote
for i := 0; i < len(remoteAddrs); i++ { for i := 0; i < len(remoteAddrs); i++ {
select { select {
case err = <-errs: case err = <-errs:
log.Info(err) log.Debug(err)
case connC := <-conns: case connC := <-conns:
// take the first + return asap // take the first + return asap
close(foundConn) close(foundConn)
......
...@@ -3,6 +3,7 @@ package swarm ...@@ -3,6 +3,7 @@ package swarm
import ( import (
"fmt" "fmt"
inet "github.com/jbenet/go-ipfs/p2p/net"
conn "github.com/jbenet/go-ipfs/p2p/net/conn" conn "github.com/jbenet/go-ipfs/p2p/net/conn"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables" lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
...@@ -60,7 +61,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { ...@@ -60,7 +61,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
// may be fine for sk to be nil, just log a warning. // may be fine for sk to be nil, just log a warning.
log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.") log.Warning("Listener not given PrivateKey, so WILL NOT SECURE conns.")
} }
log.Infof("Swarm Listening at %s", maddr) log.Debugf("Swarm Listening at %s", maddr)
list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk) list, err := conn.Listen(s.cg.Context(), maddr, s.local, sk)
if err != nil { if err != nil {
return err return err
...@@ -72,20 +73,31 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { ...@@ -72,20 +73,31 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
if err != nil { if err != nil {
return err return err
} }
log.Infof("Swarm Listeners at %s", s.ListenAddresses()) log.Debugf("Swarm Listeners at %s", s.ListenAddresses())
// signal to our notifiees on successful conn.
s.notifyAll(func(n inet.Notifiee) {
n.Listen((*Network)(s), maddr)
})
// go consume peerstream's listen accept errors. note, these ARE errors. // go consume peerstream's listen accept errors. note, these ARE errors.
// they may be killing the listener, and if we get _any_ we should be // they may be killing the listener, and if we get _any_ we should be
// fixing this in our conn.Listener (to ignore them or handle them // fixing this in our conn.Listener (to ignore them or handle them
// differently.) // differently.)
go func(ctx context.Context, sl *ps.Listener) { go func(ctx context.Context, sl *ps.Listener) {
// signal to our notifiees closing
defer s.notifyAll(func(n inet.Notifiee) {
n.ListenClose((*Network)(s), maddr)
})
for { for {
select { select {
case err, more := <-sl.AcceptErrors(): case err, more := <-sl.AcceptErrors():
if !more { if !more {
return return
} }
log.Info(err) log.Debugf("swarm listener accept error: %s", err)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
......
...@@ -4,9 +4,10 @@ import ( ...@@ -4,9 +4,10 @@ import (
"testing" "testing"
"time" "time"
inet "github.com/jbenet/go-ipfs/p2p/net"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
inet "github.com/jbenet/go-ipfs/p2p/net"
) )
func TestNotifications(t *testing.T) { func TestNotifications(t *testing.T) {
...@@ -157,6 +158,8 @@ func TestNotifications(t *testing.T) { ...@@ -157,6 +158,8 @@ func TestNotifications(t *testing.T) {
} }
type netNotifiee struct { type netNotifiee struct {
listen chan ma.Multiaddr
listenClose chan ma.Multiaddr
connected chan inet.Conn connected chan inet.Conn
disconnected chan inet.Conn disconnected chan inet.Conn
openedStream chan inet.Stream openedStream chan inet.Stream
...@@ -165,6 +168,8 @@ type netNotifiee struct { ...@@ -165,6 +168,8 @@ type netNotifiee struct {
func newNetNotifiee() *netNotifiee { func newNetNotifiee() *netNotifiee {
return &netNotifiee{ return &netNotifiee{
listen: make(chan ma.Multiaddr),
listenClose: make(chan ma.Multiaddr),
connected: make(chan inet.Conn), connected: make(chan inet.Conn),
disconnected: make(chan inet.Conn), disconnected: make(chan inet.Conn),
openedStream: make(chan inet.Stream), openedStream: make(chan inet.Stream),
...@@ -172,6 +177,12 @@ func newNetNotifiee() *netNotifiee { ...@@ -172,6 +177,12 @@ func newNetNotifiee() *netNotifiee {
} }
} }
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {
nn.listen <- a
}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {
nn.listenClose <- a
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
nn.connected <- v nn.connected <- v
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment