Commit 0f4e8fb4 authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

AddrManager: use addr manager with smarter TTLs

This addr manager should seriously help with the addrsplosion
problem.
parent 5c3146fd
......@@ -144,7 +144,7 @@ func (h *BasicHost) NewStream(pid protocol.ID, p peer.ID) (inet.Stream, error) {
func (h *BasicHost) Connect(ctx context.Context, pi peer.PeerInfo) error {
// absorb addresses into peerstore
h.Peerstore().AddPeerInfo(pi)
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peer.TempAddrTTL)
cs := h.Network().ConnsToPeer(pi.ID)
if len(cs) > 0 {
......@@ -189,6 +189,10 @@ func (h *BasicHost) Addrs() []ma.Multiaddr {
log.Debug("error retrieving network interface addrs")
}
if h.ids != nil { // add external observed addresses
addrs = append(addrs, h.ids.OwnObservedAddrs()...)
}
if h.natmgr != nil { // natmgr is nil if we do not use nat option.
nat := h.natmgr.NAT()
if nat != nil { // nat is nil if not ready, or no nat is available.
......
......@@ -33,8 +33,8 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
c.local = ln.peer
c.remote = rn.peer
c.localAddr = ln.ps.Addresses(ln.peer)[0]
c.remoteAddr = rn.ps.Addresses(rn.peer)[0]
c.localAddr = ln.ps.Addrs(ln.peer)[0]
c.remoteAddr = rn.ps.Addrs(rn.peer)[0]
c.localPrivKey = ln.ps.PrivKey(ln.peer)
c.remotePubKey = rn.ps.PubKey(rn.peer)
......
......@@ -49,7 +49,7 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
// create our own entirely, so that peers knowledge doesn't get shared
ps := peer.NewPeerstore()
ps.AddAddress(p, a)
ps.AddAddr(p, a, peer.PermanentAddrTTL)
ps.AddPrivKey(p, k)
ps.AddPubKey(p, k.GetPublic())
......@@ -307,13 +307,13 @@ func (pn *peernet) BandwidthTotals() (in uint64, out uint64) {
// Listen tells the network to start listening on given multiaddrs.
func (pn *peernet) Listen(addrs ...ma.Multiaddr) error {
pn.Peerstore().AddAddresses(pn.LocalPeer(), addrs)
pn.Peerstore().AddAddrs(pn.LocalPeer(), addrs, peer.PermanentAddrTTL)
return nil
}
// ListenAddresses returns a list of addresses at which this network listens.
func (pn *peernet) ListenAddresses() []ma.Multiaddr {
return pn.Peerstore().Addresses(pn.LocalPeer())
return pn.Peerstore().Addrs(pn.LocalPeer())
}
// InterfaceListenAddresses returns a list of addresses at which this network
......
......@@ -48,7 +48,7 @@ func TestSimultDials(t *testing.T) {
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
s.peers.AddAddress(dst, addr)
s.peers.AddAddr(dst, addr, peer.TempAddrTTL)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
......@@ -125,7 +125,7 @@ func TestDialWait(t *testing.T) {
s2p, s2addr, s2l := newSilentPeer(t)
go acceptAndHang(s2l)
defer s2l.Close()
s1.peers.AddAddress(s2p, s2addr)
s1.peers.AddAddr(s2p, s2addr, peer.PermanentAddrTTL)
before := time.Now()
if c, err := s1.Dial(ctx, s2p); err == nil {
......@@ -171,13 +171,13 @@ func TestDialBackoff(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s1.peers.AddAddresses(s2.local, s2addrs)
s1.peers.AddAddrs(s2.local, s2addrs, peer.PermanentAddrTTL)
// dial to a non-existent peer.
s3p, s3addr, s3l := newSilentPeer(t)
go acceptAndHang(s3l)
defer s3l.Close()
s1.peers.AddAddress(s3p, s3addr)
s1.peers.AddAddr(s3p, s3addr, peer.PermanentAddrTTL)
// in this test we will:
// 1) dial 10x to each node.
......@@ -389,7 +389,7 @@ func TestDialBackoffClears(t *testing.T) {
defer s2l.Close()
// phase 1 -- dial to non-operational addresses
s1.peers.AddAddress(s2.local, s2bad)
s1.peers.AddAddr(s2.local, s2bad, peer.PermanentAddrTTL)
before := time.Now()
if c, err := s1.Dial(ctx, s2.local); err == nil {
......@@ -419,7 +419,7 @@ func TestDialBackoffClears(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s1.peers.AddAddresses(s2.local, ifaceAddrs1)
s1.peers.AddAddrs(s2.local, ifaceAddrs1, peer.PermanentAddrTTL)
before = time.Now()
if c, err := s1.Dial(ctx, s2.local); err != nil {
......
......@@ -19,7 +19,7 @@ func TestPeers(t *testing.T) {
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.peers.AddAddress(dst, addr)
s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL)
// t.Logf("connections from %s", s.LocalPeer())
// for _, c := range s.ConnectionsToPeer(dst) {
// t.Logf("connection from %s to %s: %v", s.LocalPeer(), dst, c)
......
......@@ -25,7 +25,7 @@ func TestSimultOpen(t *testing.T) {
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
s.peers.AddAddress(dst, addr)
s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
......
......@@ -110,7 +110,7 @@ func TestDialBadAddrs(t *testing.T) {
test := func(a ma.Multiaddr) {
p := testutil.RandPeerIDFatal(t)
s.peers.AddAddress(p, a)
s.peers.AddAddr(p, a, peer.PermanentAddrTTL)
if _, err := s.Dial(ctx, p); err == nil {
t.Error("swarm should not dial: %s", m)
}
......
......@@ -289,14 +289,14 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
}
// get remote peer addrs
remoteAddrs := s.peers.Addresses(p)
remoteAddrs := s.peers.Addrs(p)
// make sure we can use the addresses.
remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs)
// drop out any addrs that would just dial ourselves. use ListenAddresses
// as that is a more authoritative view than localAddrs.
ila, _ := s.InterfaceListenAddresses()
remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addresses(s.local))
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local))
log.Debugf("%s swarm dialing %s -- remote:%s local:%s", s.local, p, remoteAddrs, s.ListenAddresses())
if len(remoteAddrs) == 0 {
err := errors.New("peer has no addresses")
......
......@@ -53,7 +53,7 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
// return err
// }
// for _, a := range resolved {
// s.peers.AddAddress(s.local, a)
// s.peers.AddAddr(s.local, a)
// }
sk := s.peers.PrivKey(s.local)
......
......@@ -75,7 +75,7 @@ func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm) {
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// TODO: make a DialAddr func.
s.peers.AddAddress(dst, addr)
s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
......
// package addr provides useful address utilities for p2p
// applications. It buys into the multi-transport addressing
// scheme Multiaddr, and uses it to build its own p2p addressing.
// All Addrs must have an associated peer.ID.
package addr
package peer
import (
"sync"
"time"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
const (
// TempAddrTTL is the ttl used for a short lived address
TempAddrTTL = time.Second * 10
// ProviderAddrTTL is the TTL of an address we've received from a provider.
// This is also a temporary address, but lasts longer. After this expires,
// the records we return will require an extra lookup.
ProviderAddrTTL = time.Minute * 10
peer "github.com/jbenet/go-ipfs/p2p/peer"
// RecentlyConnectedAddrTTL is used when we recently connected to a peer.
// It means that we are reasonably certain of the peer's address.
RecentlyConnectedAddrTTL = time.Minute * 10
// OwnObservedAddrTTL is used for our own external addresses observed by peers.
OwnObservedAddrTTL = time.Minute * 20
// PermanentAddrTTL is the ttl for a "permanent address" (e.g. bootstrap nodes)
// if we haven't shipped you an update to ipfs in 356 days
// we probably arent running the same bootstrap nodes...
PermanentAddrTTL = time.Hour * 24 * 356
// ConnectedAddrTTL is the ttl used for the addresses of a peer to whom
// we're connected directly. This is basically permanent, as we will
// clear them + re-add under a TempAddrTTL after disconnecting.
ConnectedAddrTTL = PermanentAddrTTL
)
type expiringAddr struct {
......@@ -24,30 +46,44 @@ func (e *expiringAddr) ExpiredBy(t time.Time) bool {
type addrSet map[string]expiringAddr
// Manager manages addresses.
// AddrManager manages addresses.
// The zero-value is ready to be used.
type Manager struct {
type AddrManager struct {
addrmu sync.Mutex // guards addrs
addrs map[peer.ID]addrSet
addrs map[ID]addrSet
}
// ensures the Manager is initialized.
// ensures the AddrManager is initialized.
// So we can use the zero value.
func (mgr *Manager) init() {
func (mgr *AddrManager) init() {
if mgr.addrs == nil {
mgr.addrs = make(map[ID]addrSet)
}
}
func (mgr *AddrManager) Peers() []ID {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
if mgr.addrs == nil {
mgr.addrs = make(map[peer.ID]addrSet)
return nil
}
pids := make([]ID, 0, len(mgr.addrs))
for pid := range mgr.addrs {
pids = append(pids, pid)
}
return pids
}
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
func (mgr *Manager) AddAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
func (mgr *AddrManager) AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.AddAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// AddAddrs gives Manager addresses to use, with a given ttl
// AddAddrs gives AddrManager addresses to use, with a given ttl
// (time-to-live), after which the address is no longer valid.
// If the manager has a longer TTL, the operation is a no-op for that address
func (mgr *Manager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
func (mgr *AddrManager) AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
......@@ -77,13 +113,13 @@ func (mgr *Manager) AddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
}
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
func (mgr *Manager) SetAddr(p peer.ID, addr ma.Multiaddr, ttl time.Duration) {
func (mgr *AddrManager) SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration) {
mgr.SetAddrs(p, []ma.Multiaddr{addr}, ttl)
}
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
func (mgr *Manager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
func (mgr *AddrManager) SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
......@@ -109,8 +145,8 @@ func (mgr *Manager) SetAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration)
}
}
// Addresses returns all known (and valid) addresses for a given peer.
func (mgr *Manager) Addrs(p peer.ID) []ma.Multiaddr {
// Addresses returns all known (and valid) addresses for a given
func (mgr *AddrManager) Addrs(p ID) []ma.Multiaddr {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
......@@ -143,7 +179,7 @@ func (mgr *Manager) Addrs(p peer.ID) []ma.Multiaddr {
}
// ClearAddresses removes all previously stored addresses
func (mgr *Manager) ClearAddrs(p peer.ID) {
func (mgr *AddrManager) ClearAddrs(p ID) {
mgr.addrmu.Lock()
defer mgr.addrmu.Unlock()
mgr.init()
......
package addr
package peer
import (
"testing"
"time"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
peer "github.com/jbenet/go-ipfs/p2p/peer"
)
func IDS(t *testing.T, ids string) peer.ID {
id, err := peer.IDB58Decode(ids)
func IDS(t *testing.T, ids string) ID {
id, err := IDB58Decode(ids)
if err != nil {
t.Fatal(err)
}
......@@ -71,7 +69,7 @@ func TestAddresses(t *testing.T) {
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
ttl := time.Hour
m := Manager{}
m := AddrManager{}
m.AddAddr(id1, ma11, ttl)
m.AddAddrs(id2, []ma.Multiaddr{ma21, ma22}, ttl)
......@@ -109,7 +107,7 @@ func TestAddressesExpire(t *testing.T) {
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
m := Manager{}
m := AddrManager{}
m.AddAddr(id1, ma11, time.Hour)
m.AddAddr(id1, ma12, time.Hour)
m.AddAddr(id1, ma13, time.Hour)
......@@ -164,7 +162,7 @@ func TestClearWorks(t *testing.T) {
ma24 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma25 := MA(t, "/ip4/5.2.3.3/tcp/5555")
m := Manager{}
m := AddrManager{}
m.AddAddr(id1, ma11, time.Hour)
m.AddAddr(id1, ma12, time.Hour)
m.AddAddr(id1, ma13, time.Hour)
......
......@@ -20,8 +20,8 @@ const (
// Peerstore provides a threadsafe store of Peer related
// information.
type Peerstore interface {
AddrBook
KeyBook
AddressBook
Metrics
// Peers returns a list of all peer.IDs in this Peerstore
......@@ -32,9 +32,6 @@ type Peerstore interface {
// that peer, useful to other services.
PeerInfo(ID) PeerInfo
// AddPeerInfo absorbs the information listed in given PeerInfo.
AddPeerInfo(PeerInfo)
// Get/Put is a simple registry for other peer-related key/value pairs.
// if we find something we use often, it should become its own set of
// methods. this is a last resort.
......@@ -42,109 +39,30 @@ type Peerstore interface {
Put(id ID, key string, val interface{}) error
}
// AddressBook tracks the addresses of Peers
type AddressBook interface {
Addresses(ID) []ma.Multiaddr // returns addresses for ID
AddAddress(ID, ma.Multiaddr) // Adds given addr for ID
AddAddresses(ID, []ma.Multiaddr) // Adds given addrs for ID
SetAddresses(ID, []ma.Multiaddr) // Sets given addrs for ID (clears previously stored)
}
// AddrBook is an interface that fits the new AddrManager. I'm patching
// it up in here to avoid changing a ton of the codebase.
type AddrBook interface {
type expiringAddr struct {
Addr ma.Multiaddr
TTL time.Time
}
// AddAddr calls AddAddrs(p, []ma.Multiaddr{addr}, ttl)
AddAddr(p ID, addr ma.Multiaddr, ttl time.Duration)
func (e *expiringAddr) Expired() bool {
return time.Now().After(e.TTL)
}
// AddAddrs gives AddrManager addresses to use, with a given ttl
// (time-to-live), after which the address is no longer valid.
// If the manager has a longer TTL, the operation is a no-op for that address
AddAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
type addressMap map[string]expiringAddr
// SetAddr calls mgr.SetAddrs(p, addr, ttl)
SetAddr(p ID, addr ma.Multiaddr, ttl time.Duration)
type addressbook struct {
sync.RWMutex // guards all fields
// SetAddrs sets the ttl on addresses. This clears any TTL there previously.
// This is used when we receive the best estimate of the validity of an address.
SetAddrs(p ID, addrs []ma.Multiaddr, ttl time.Duration)
addrs map[ID]addressMap
ttl time.Duration // initial ttl
}
// Addresses returns all known (and valid) addresses for a given
Addrs(p ID) []ma.Multiaddr
func newAddressbook() *addressbook {
return &addressbook{
addrs: map[ID]addressMap{},
ttl: AddressTTL,
}
}
func (ab *addressbook) Peers() []ID {
ab.RLock()
ps := make([]ID, 0, len(ab.addrs))
for p := range ab.addrs {
ps = append(ps, p)
}
ab.RUnlock()
return ps
}
func (ab *addressbook) Addresses(p ID) []ma.Multiaddr {
ab.Lock()
defer ab.Unlock()
maddrs, found := ab.addrs[p]
if !found {
return nil
}
good := make([]ma.Multiaddr, 0, len(maddrs))
var expired []string
for s, m := range maddrs {
if m.Expired() {
expired = append(expired, s)
} else {
good = append(good, m.Addr)
}
}
// clean up the expired ones.
for _, s := range expired {
delete(ab.addrs[p], s)
}
return good
}
func (ab *addressbook) AddAddress(p ID, m ma.Multiaddr) {
ab.AddAddresses(p, []ma.Multiaddr{m})
}
func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) {
ab.Lock()
defer ab.Unlock()
amap, found := ab.addrs[p]
if !found {
amap = addressMap{}
ab.addrs[p] = amap
}
ttl := time.Now().Add(ab.ttl)
for _, m := range ms {
// re-set all of them for new ttl.
amap[m.String()] = expiringAddr{
Addr: m,
TTL: ttl,
}
}
}
func (ab *addressbook) SetAddresses(p ID, ms []ma.Multiaddr) {
ab.Lock()
defer ab.Unlock()
amap := addressMap{}
ttl := time.Now().Add(ab.ttl)
for _, m := range ms {
amap[m.String()] = expiringAddr{Addr: m, TTL: ttl}
}
ab.addrs[p] = amap // clear what was there before
// ClearAddresses removes all previously stored addresses
ClearAddrs(p ID)
}
// KeyBook tracks the Public keys of Peers.
......@@ -231,8 +149,8 @@ func (kb *keybook) AddPrivKey(p ID, sk ic.PrivKey) error {
type peerstore struct {
keybook
addressbook
metrics
AddrManager
// store other data, like versions
ds ds.ThreadSafeDatastore
......@@ -242,8 +160,8 @@ type peerstore struct {
func NewPeerstore() Peerstore {
return &peerstore{
keybook: *newKeybook(),
addressbook: *newAddressbook(),
metrics: *(NewMetrics()).(*metrics),
AddrManager: AddrManager{},
ds: dssync.MutexWrap(ds.NewMapDatastore()),
}
}
......@@ -263,7 +181,7 @@ func (ps *peerstore) Peers() []ID {
for _, p := range ps.keybook.Peers() {
set[p] = struct{}{}
}
for _, p := range ps.addressbook.Peers() {
for _, p := range ps.AddrManager.Peers() {
set[p] = struct{}{}
}
......@@ -277,14 +195,10 @@ func (ps *peerstore) Peers() []ID {
func (ps *peerstore) PeerInfo(p ID) PeerInfo {
return PeerInfo{
ID: p,
Addrs: ps.addressbook.Addresses(p),
Addrs: ps.AddrManager.Addrs(p),
}
}
func (ps *peerstore) AddPeerInfo(pi PeerInfo) {
ps.AddAddresses(pi.ID, pi.Addrs)
}
func PeerInfos(ps Peerstore, peers []ID) []PeerInfo {
pi := make([]PeerInfo, len(peers))
for i, p := range peers {
......
package peer
import (
"testing"
"time"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func IDS(t *testing.T, ids string) ID {
id, err := IDB58Decode(ids)
if err != nil {
t.Fatal(err)
}
return id
}
func MA(t *testing.T, m string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(m)
if err != nil {
t.Fatal(err)
}
return maddr
}
func TestAddresses(t *testing.T) {
ps := NewPeerstore()
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
id4 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Kn")
id5 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ5Km")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
ma21 := MA(t, "/ip4/2.2.3.2/tcp/1111")
ma22 := MA(t, "/ip4/2.2.3.2/tcp/2222")
ma31 := MA(t, "/ip4/3.2.3.3/tcp/1111")
ma32 := MA(t, "/ip4/3.2.3.3/tcp/2222")
ma33 := MA(t, "/ip4/3.2.3.3/tcp/3333")
ma41 := MA(t, "/ip4/4.2.3.3/tcp/1111")
ma42 := MA(t, "/ip4/4.2.3.3/tcp/2222")
ma43 := MA(t, "/ip4/4.2.3.3/tcp/3333")
ma44 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma51 := MA(t, "/ip4/5.2.3.3/tcp/1111")
ma52 := MA(t, "/ip4/5.2.3.3/tcp/2222")
ma53 := MA(t, "/ip4/5.2.3.3/tcp/3333")
ma54 := MA(t, "/ip4/5.2.3.3/tcp/4444")
ma55 := MA(t, "/ip4/5.2.3.3/tcp/5555")
ps.AddAddress(id1, ma11)
ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22})
ps.AddAddresses(id2, []ma.Multiaddr{ma21, ma22}) // idempotency
ps.AddAddress(id3, ma31)
ps.AddAddress(id3, ma32)
ps.AddAddress(id3, ma33)
ps.AddAddress(id3, ma33) // idempotency
ps.AddAddress(id3, ma33)
ps.AddAddresses(id4, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // multiple
ps.AddAddresses(id5, []ma.Multiaddr{ma21, ma22}) // clearing
ps.AddAddresses(id5, []ma.Multiaddr{ma41, ma42, ma43, ma44}) // clearing
ps.SetAddresses(id5, []ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}) // clearing
test := func(exp, act []ma.Multiaddr) {
if len(exp) != len(act) {
t.Fatal("lengths not the same")
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatal("expected address %s not found", a)
}
}
}
// test the Addresses return value
test([]ma.Multiaddr{ma11}, ps.Addresses(id1))
test([]ma.Multiaddr{ma21, ma22}, ps.Addresses(id2))
test([]ma.Multiaddr{ma31, ma32, ma33}, ps.Addresses(id3))
test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.Addresses(id4))
test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.Addresses(id5))
// test also the PeerInfo return
test([]ma.Multiaddr{ma11}, ps.PeerInfo(id1).Addrs)
test([]ma.Multiaddr{ma21, ma22}, ps.PeerInfo(id2).Addrs)
test([]ma.Multiaddr{ma31, ma32, ma33}, ps.PeerInfo(id3).Addrs)
test([]ma.Multiaddr{ma41, ma42, ma43, ma44}, ps.PeerInfo(id4).Addrs)
test([]ma.Multiaddr{ma51, ma52, ma53, ma54, ma55}, ps.PeerInfo(id5).Addrs)
}
func TestAddressTTL(t *testing.T) {
ps := NewPeerstore()
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
ma1 := MA(t, "/ip4/1.2.3.1/tcp/1111")
ma2 := MA(t, "/ip4/2.2.3.2/tcp/2222")
ma3 := MA(t, "/ip4/3.2.3.3/tcp/3333")
ma4 := MA(t, "/ip4/4.2.3.3/tcp/4444")
ma5 := MA(t, "/ip4/5.2.3.3/tcp/5555")
ps.AddAddress(id1, ma1)
ps.AddAddress(id1, ma2)
ps.AddAddress(id1, ma3)
ps.AddAddress(id1, ma4)
ps.AddAddress(id1, ma5)
test := func(exp, act []ma.Multiaddr) {
if len(exp) != len(act) {
t.Fatal("lengths not the same")
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatal("expected address %s not found", a)
}
}
}
testTTL := func(ttle time.Duration, id ID, addr ma.Multiaddr) {
ab := ps.(*peerstore).addressbook
ttlat := ab.addrs[id][addr.String()].TTL
ttla := ttlat.Sub(time.Now())
if ttla > ttle {
t.Error("ttl is greater than expected", ttle, ttla)
}
if ttla < (ttle / 2) {
t.Error("ttl is smaller than expected", ttle/2, ttla)
}
}
// should they are there
ab := ps.(*peerstore).addressbook
if len(ab.addrs[id1]) != 5 {
t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1])
}
// test the Addresses return value
test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.Addresses(id1))
test([]ma.Multiaddr{ma1, ma2, ma3, ma4, ma5}, ps.PeerInfo(id1).Addrs)
// check the addr TTL is a bit smaller than the init TTL
testTTL(AddressTTL, id1, ma1)
testTTL(AddressTTL, id1, ma2)
testTTL(AddressTTL, id1, ma3)
testTTL(AddressTTL, id1, ma4)
testTTL(AddressTTL, id1, ma5)
// change the TTL
setTTL := func(id ID, addr ma.Multiaddr, ttl time.Time) {
a := ab.addrs[id][addr.String()]
a.TTL = ttl
ab.addrs[id][addr.String()] = a
}
setTTL(id1, ma1, time.Now().Add(-1*time.Second))
setTTL(id1, ma2, time.Now().Add(-1*time.Hour))
setTTL(id1, ma3, time.Now().Add(-1*AddressTTL))
// should no longer list those
test([]ma.Multiaddr{ma4, ma5}, ps.Addresses(id1))
test([]ma.Multiaddr{ma4, ma5}, ps.PeerInfo(id1).Addrs)
// should no longer be there
if len(ab.addrs[id1]) != 2 {
t.Error("incorrect addr count", len(ab.addrs[id1]), ab.addrs[id1])
}
}
......@@ -11,6 +11,7 @@ import (
host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
pb "github.com/jbenet/go-ipfs/p2p/protocol/identify/pb"
config "github.com/jbenet/go-ipfs/repo/config"
......@@ -49,6 +50,10 @@ type IDService struct {
// for wait purposes
currid map[inet.Conn]chan struct{}
currmu sync.RWMutex
// our own observed addresses.
// TODO: instead of expiring, remove these when we disconnect
addrs peer.AddrManager
}
func NewIDService(h host.Host) *IDService {
......@@ -60,6 +65,11 @@ func NewIDService(h host.Host) *IDService {
return s
}
// OwnObservedAddrs returns the addresses peers have reported we've dialed from
func (ids *IDService) OwnObservedAddrs() []ma.Multiaddr {
return ids.addrs.Addrs(ids.Host.ID())
}
func (ids *IDService) IdentifyConn(c inet.Conn) {
ids.currmu.Lock()
if wait, found := ids.currid[c]; found {
......@@ -176,7 +186,7 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c inet.Conn) {
// update our peerstore with the addresses. here, we SET the addresses, clearing old ones.
// We are receiving from the peer itself. this is current address ground truth.
ids.Host.Peerstore().SetAddresses(p, lmaddrs)
ids.Host.Peerstore().SetAddrs(p, lmaddrs, peer.ConnectedAddrTTL)
log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs)
// get protocol versions
......@@ -235,7 +245,7 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) {
// ok! we have the observed version of one of our ListenAddresses!
log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr)
ids.Host.Peerstore().AddAddress(ids.Host.ID(), maddr)
ids.addrs.AddAddr(ids.Host.ID(), maddr, peer.OwnObservedAddrTTL)
}
func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
......@@ -246,3 +256,28 @@ func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
}
return false
}
// netNotifiee defines methods to be used with the IpfsDHT
type netNotifiee IDService
func (nn *netNotifiee) IDService() *IDService {
return (*IDService)(nn)
}
func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
// TODO: deprecate the setConnHandler hook, and kick off
// identification here.
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {
// undo the setting of addresses to peer.ConnectedAddrTTL we did
ids := nn.IDService()
ps := ids.Host.Peerstore()
addrs := ps.Addrs(v.RemotePeer())
ps.SetAddrs(v.RemotePeer(), addrs, peer.RecentlyConnectedAddrTTL)
}
func (nn *netNotifiee) OpenedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) ClosedStream(n inet.Network, v inet.Stream) {}
func (nn *netNotifiee) Listen(n inet.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n inet.Network, a ma.Multiaddr) {}
......@@ -38,7 +38,7 @@ func subtestIDService(t *testing.T, postDialWait time.Duration) {
// the IDService should be opened automatically, by the network.
// what we should see now is that both peers know about each others listen addresses.
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addresses(h2p)) // has them
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them
testHasProtocolVersions(t, h1, h2p)
// now, this wait we do have to do. it's the wait for the Listening side
......@@ -50,12 +50,12 @@ func subtestIDService(t *testing.T, postDialWait time.Duration) {
<-h2.IDService().IdentifyWait(c[0])
// and the protocol versions.
testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addresses(h1p)) // has them
testKnowsAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p)) // has them
testHasProtocolVersions(t, h2, h1p)
}
func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
actual := h.Peerstore().Addresses(p)
actual := h.Peerstore().Addrs(p)
if len(actual) != len(expected) {
t.Error("dont have the same addresses")
......
......@@ -22,14 +22,14 @@ func GenSwarmNetwork(t *testing.T, ctx context.Context) *swarm.Network {
if err != nil {
t.Fatal(err)
}
ps.AddAddresses(p.ID, n.ListenAddresses())
ps.AddAddrs(p.ID, n.ListenAddresses(), peer.PermanentAddrTTL)
return n
}
func DivulgeAddresses(a, b inet.Network) {
id := a.LocalPeer()
addrs := a.Peerstore().Addresses(id)
b.Peerstore().AddAddresses(id, addrs)
addrs := a.Peerstore().Addrs(id)
b.Peerstore().AddAddrs(id, addrs, peer.PermanentAddrTTL)
}
func GenHostSwarm(t *testing.T, ctx context.Context) *bhost.BasicHost {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment