Unverified Commit 57ed88e1 authored by Steven Allen's avatar Steven Allen Committed by GitHub
Browse files

Merge pull request #427 from cannium/track-more-observed-address-info

Track more info for observed addresses
parents 2787133b 92ec4b2d
...@@ -5,8 +5,6 @@ import ( ...@@ -5,8 +5,6 @@ import (
"io" "io"
"time" "time"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess" goprocess "github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context" goprocessctx "github.com/jbenet/goprocess/context"
...@@ -15,6 +13,7 @@ import ( ...@@ -15,6 +13,7 @@ import (
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol" protocol "github.com/libp2p/go-libp2p-protocol"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns" madns "github.com/multiformats/go-multiaddr-dns"
msmux "github.com/multiformats/go-multistream" msmux "github.com/multiformats/go-multistream"
......
...@@ -393,7 +393,8 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) { ...@@ -393,7 +393,8 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) {
// ok! we have the observed version of one of our ListenAddresses! // ok! we have the observed version of one of our ListenAddresses!
log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr) log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr)
ids.observedAddrs.Add(maddr, c.RemoteMultiaddr()) ids.observedAddrs.Add(maddr, c.LocalMultiaddr(), c.RemoteMultiaddr(),
c.Stat().Direction)
} }
func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
......
...@@ -4,12 +4,18 @@ import ( ...@@ -4,12 +4,18 @@ import (
"sync" "sync"
"time" "time"
net "github.com/libp2p/go-libp2p-net"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
const ActivationThresh = 4 const ActivationThresh = 4
type observation struct {
seenTime time.Time
connDirection net.Direction
}
// ObservedAddr is an entry for an address reported by our peers. // ObservedAddr is an entry for an address reported by our peers.
// We only use addresses that: // We only use addresses that:
// - have been observed at least 4 times in last 1h. (counter symmetric nats) // - have been observed at least 4 times in last 1h. (counter symmetric nats)
...@@ -17,16 +23,15 @@ const ActivationThresh = 4 ...@@ -17,16 +23,15 @@ const ActivationThresh = 4
// network, or network port mapppings, may have changed. // network, or network port mapppings, may have changed.
type ObservedAddr struct { type ObservedAddr struct {
Addr ma.Multiaddr Addr ma.Multiaddr
SeenBy map[string]time.Time SeenBy map[string]observation // peer(observer) address -> observation info
LastSeen time.Time LastSeen time.Time
Activated bool
} }
func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool { func (oa *ObservedAddr) activated(ttl time.Duration) bool {
// cleanup SeenBy set // cleanup SeenBy set
now := time.Now() now := time.Now()
for k, t := range oa.SeenBy { for k, ob := range oa.SeenBy {
if now.Sub(t) > ttl*ActivationThresh { if now.Sub(ob.seenTime) > ttl*ActivationThresh {
delete(oa.SeenBy, k) delete(oa.SeenBy, k)
} }
} }
...@@ -41,60 +46,75 @@ func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool { ...@@ -41,60 +46,75 @@ func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool {
type ObservedAddrSet struct { type ObservedAddrSet struct {
sync.Mutex // guards whole datastruct. sync.Mutex // guards whole datastruct.
addrs map[string]*ObservedAddr // local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
ttl time.Duration ttl time.Duration
} }
func (oas *ObservedAddrSet) Addrs() []ma.Multiaddr { // Addrs return all activated observed addresses
func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
oas.Lock() oas.Lock()
defer oas.Unlock() defer oas.Unlock()
// for zero-value. // for zero-value.
if oas.addrs == nil { if len(oas.addrs) == 0 {
return nil return nil
} }
now := time.Now() now := time.Now()
addrs := make([]ma.Multiaddr, 0, len(oas.addrs)) for local, observedAddrs := range oas.addrs {
for s, a := range oas.addrs { filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
// remove timed out addresses. for _, a := range observedAddrs {
if now.Sub(a.LastSeen) > oas.ttl { // leave only alive observed addresses
delete(oas.addrs, s) if now.Sub(a.LastSeen) <= oas.ttl {
continue filteredAddrs = append(filteredAddrs, a)
} if a.activated(oas.ttl) {
if a.Activated || a.TryActivate(oas.ttl) {
addrs = append(addrs, a.Addr) addrs = append(addrs, a.Addr)
} }
} }
}
oas.addrs[local] = filteredAddrs
}
return addrs return addrs
} }
func (oas *ObservedAddrSet) Add(addr ma.Multiaddr, observer ma.Multiaddr) { func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
direction net.Direction) {
oas.Lock() oas.Lock()
defer oas.Unlock() defer oas.Unlock()
// for zero-value. // for zero-value.
if oas.addrs == nil { if oas.addrs == nil {
oas.addrs = make(map[string]*ObservedAddr) oas.addrs = make(map[string][]*ObservedAddr)
oas.ttl = pstore.OwnObservedAddrTTL oas.ttl = pstore.OwnObservedAddrTTL
} }
s := addr.String() now := time.Now()
oa, found := oas.addrs[s] observerString := observerGroup(observer)
localString := local.String()
ob := observation{
seenTime: now,
connDirection: direction,
}
// first time seeing address. observedAddrs := oas.addrs[localString]
if !found { // check if observed address seen yet, if so, update it
oa = &ObservedAddr{ for i, previousObserved := range observedAddrs {
Addr: addr, if previousObserved.Addr.Equal(observed) {
SeenBy: make(map[string]time.Time), observedAddrs[i].SeenBy[observerString] = ob
observedAddrs[i].LastSeen = now
return
} }
oas.addrs[s] = oa
} }
// observed address not seen yet, append it
// mark the observer oas.addrs[localString] = append(oas.addrs[localString], &ObservedAddr{
oa.SeenBy[observerGroup(observer)] = time.Now() Addr: observed,
oa.LastSeen = time.Now() SeenBy: map[string]observation{
observerString: ob,
},
LastSeen: now,
})
} }
// observerGroup is a function that determines what part of // observerGroup is a function that determines what part of
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
net "github.com/libp2p/go-libp2p-net"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
) )
...@@ -49,15 +50,22 @@ func TestObsAddrSet(t *testing.T) { ...@@ -49,15 +50,22 @@ func TestObsAddrSet(t *testing.T) {
b4 := m("/ip4/1.2.3.9/tcp/1237") b4 := m("/ip4/1.2.3.9/tcp/1237")
b5 := m("/ip4/1.2.3.10/tcp/1237") b5 := m("/ip4/1.2.3.10/tcp/1237")
oas := ObservedAddrSet{} oas := &ObservedAddrSet{}
if !addrsMarch(oas.Addrs(), nil) { if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should be empty") t.Error("addrs should be empty")
} }
oas.Add(a1, a4) add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) {
oas.Add(a2, a4) dummyLocal := m("/ip4/127.0.0.1/tcp/10086")
oas.Add(a3, a4) dummyDirection := net.DirOutbound
oas.Add(observed, dummyLocal, observer, dummyDirection)
}
add(oas, a1, a4)
add(oas, a2, a4)
add(oas, a3, a4)
// these are all different so we should not yet get them. // these are all different so we should not yet get them.
if !addrsMarch(oas.Addrs(), nil) { if !addrsMarch(oas.Addrs(), nil) {
...@@ -65,39 +73,39 @@ func TestObsAddrSet(t *testing.T) { ...@@ -65,39 +73,39 @@ func TestObsAddrSet(t *testing.T) {
} }
// same observer, so should not yet get them. // same observer, so should not yet get them.
oas.Add(a1, a4) add(oas, a1, a4)
oas.Add(a2, a4) add(oas, a2, a4)
oas.Add(a3, a4) add(oas, a3, a4)
if !addrsMarch(oas.Addrs(), nil) { if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs)") t.Error("addrs should _still_ be empty (same obs)")
} }
// different observer, but same observer group. // different observer, but same observer group.
oas.Add(a1, a5) add(oas, a1, a5)
oas.Add(a2, a5) add(oas, a2, a5)
oas.Add(a3, a5) add(oas, a3, a5)
if !addrsMarch(oas.Addrs(), nil) { if !addrsMarch(oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs group)") t.Error("addrs should _still_ be empty (same obs group)")
} }
oas.Add(a1, b1) add(oas, a1, b1)
oas.Add(a1, b2) add(oas, a1, b2)
oas.Add(a1, b3) add(oas, a1, b3)
if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1}) { if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1}) {
t.Error("addrs should only have a1") t.Error("addrs should only have a1")
} }
oas.Add(a2, a5) add(oas, a2, a5)
oas.Add(a1, a5) add(oas, a1, a5)
oas.Add(a1, a5) add(oas, a1, a5)
oas.Add(a2, b1) add(oas, a2, b1)
oas.Add(a1, b1) add(oas, a1, b1)
oas.Add(a1, b1) add(oas, a1, b1)
oas.Add(a2, b2) add(oas, a2, b2)
oas.Add(a1, b2) add(oas, a1, b2)
oas.Add(a1, b2) add(oas, a1, b2)
oas.Add(a2, b4) add(oas, a2, b4)
oas.Add(a2, b5) add(oas, a2, b5)
if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1, a2}) { if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1, a2}) {
t.Error("addrs should only have a1, a2") t.Error("addrs should only have a1, a2")
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment