Commit 96df62bd authored by Can ZHANG's avatar Can ZHANG
Browse files

Changes on discussion

- map internal -> []{external -> { observer -> [time, direction] } }
- some cleaning
parent b4e4d496
...@@ -11,25 +11,27 @@ import ( ...@@ -11,25 +11,27 @@ import (
const ActivationThresh = 4 const ActivationThresh = 4
type observation struct {
seenTime time.Time
connDirection net.Direction
}
// ObservedAddr is an entry for an address reported by our peers. // ObservedAddr is an entry for an address reported by our peers.
// We only use addresses that: // We only use addresses that:
// - have been observed at least 4 times in last 1h. (counter symmetric nats) // - have been observed at least 4 times in last 1h. (counter symmetric nats)
// - have been observed at least once recently (1h), because our position in the // - have been observed at least once recently (1h), because our position in the
// network, or network port mapppings, may have changed. // network, or network port mapppings, may have changed.
type ObservedAddr struct { type ObservedAddr struct {
Addr ma.Multiaddr // observed address by peers Addr ma.Multiaddr
InternalAddr ma.Multiaddr // corresponding internal address SeenBy map[string]observation // peer(observer) address -> observation info
SeenBy map[string]time.Time LastSeen time.Time
LastSeen time.Time
ConnDirection net.Direction
Activated bool
} }
func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool { func (oa *ObservedAddr) activated(ttl time.Duration) bool {
// cleanup SeenBy set // cleanup SeenBy set
now := time.Now() now := time.Now()
for k, t := range oa.SeenBy { for k, ob := range oa.SeenBy {
if now.Sub(t) > ttl*ActivationThresh { if now.Sub(ob.seenTime) > ttl*ActivationThresh {
delete(oa.SeenBy, k) delete(oa.SeenBy, k)
} }
} }
...@@ -44,32 +46,37 @@ func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool { ...@@ -44,32 +46,37 @@ func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool {
type ObservedAddrSet struct { type ObservedAddrSet struct {
sync.Mutex // guards whole datastruct. sync.Mutex // guards whole datastruct.
addrs map[string]*ObservedAddr // local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
ttl time.Duration ttl time.Duration
} }
func (oas *ObservedAddrSet) Addrs() []ma.Multiaddr { // Addrs return all activated observed addresses
func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) {
oas.Lock() oas.Lock()
defer oas.Unlock() defer oas.Unlock()
// for zero-value. // for zero-value.
if oas.addrs == nil { if len(oas.addrs) == 0 {
return nil return nil
} }
now := time.Now() now := time.Now()
addrs := make([]ma.Multiaddr, 0, len(oas.addrs)) filteredAddrMap := make(map[string][]*ObservedAddr)
for s, a := range oas.addrs { for local, observedAddrs := range oas.addrs {
// remove timed out addresses. filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs))
if now.Sub(a.LastSeen) > oas.ttl { for _, a := range observedAddrs {
delete(oas.addrs, s) // leave only alive observed addresses
continue if now.Sub(a.LastSeen) <= oas.ttl {
} filteredAddrs = append(filteredAddrs, a)
if a.activated(oas.ttl) {
if a.Activated || a.TryActivate(oas.ttl) { addrs = append(addrs, a.Addr)
addrs = append(addrs, a.Addr) }
}
} }
filteredAddrMap[local] = filteredAddrs
} }
oas.addrs = filteredAddrMap
return addrs return addrs
} }
...@@ -81,27 +88,43 @@ func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr, ...@@ -81,27 +88,43 @@ func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr,
// for zero-value. // for zero-value.
if oas.addrs == nil { if oas.addrs == nil {
oas.addrs = make(map[string]*ObservedAddr) oas.addrs = make(map[string][]*ObservedAddr)
oas.ttl = pstore.OwnObservedAddrTTL oas.ttl = pstore.OwnObservedAddrTTL
} }
s := observed.String() now := time.Now()
oa, found := oas.addrs[s] observerString := observerGroup(observer)
localString := local.String()
observedAddr := &ObservedAddr{
Addr: observed,
SeenBy: map[string]observation{
observerString: {
seenTime: now,
connDirection: direction,
},
},
LastSeen: now,
}
// first time seeing address. observedAddrs, found := oas.addrs[localString]
// map key not exist yet, init with new values
if !found { if !found {
oa = &ObservedAddr{ oas.addrs[localString] = []*ObservedAddr{observedAddr}
Addr: observed, return
InternalAddr: local, }
SeenBy: make(map[string]time.Time), // check if observed address seen yet, if so, update it
ConnDirection: direction, for i, previousObserved := range observedAddrs {
if previousObserved.Addr.Equal(observed) {
observedAddrs[i].SeenBy[observerString] = observation{
seenTime: now,
connDirection: direction,
}
observedAddrs[i].LastSeen = now
return
} }
oas.addrs[s] = oa
} }
// observed address not seen yet, append it
// mark the observer oas.addrs[localString] = append(oas.addrs[localString], observedAddr)
oa.SeenBy[observerGroup(observer)] = time.Now()
oa.LastSeen = time.Now()
} }
// observerGroup is a function that determines what part of // observerGroup is a function that determines what part of
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment