diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 5379bed4b264e4840ed2d26a139041541d92a9be..1838cbc2fb09653adfd38b8c991ead409181e426 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -5,8 +5,6 @@ import ( "io" "time" - identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" - logging "github.com/ipfs/go-log" goprocess "github.com/jbenet/goprocess" goprocessctx "github.com/jbenet/goprocess/context" @@ -15,6 +13,7 @@ import ( peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" protocol "github.com/libp2p/go-libp2p-protocol" + identify "github.com/libp2p/go-libp2p/p2p/protocol/identify" ma "github.com/multiformats/go-multiaddr" madns "github.com/multiformats/go-multiaddr-dns" msmux "github.com/multiformats/go-multistream" diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 1f8dc012032d7d9fdceb5f13166b961673378086..4fc88dd450da994260f1e7be4068274c9651d3ca 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -393,7 +393,8 @@ func (ids *IDService) consumeObservedAddress(observed []byte, c inet.Conn) { // ok! we have the observed version of one of our ListenAddresses! log.Debugf("added own observed listen addr: %s --> %s", c.LocalMultiaddr(), maddr) - ids.observedAddrs.Add(maddr, c.RemoteMultiaddr()) + ids.observedAddrs.Add(maddr, c.LocalMultiaddr(), c.RemoteMultiaddr(), + c.Stat().Direction) } func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool { diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index 0e72ab343336cd41f6112653d44975756edc03aa..40c6f78df256d3cffdaa80662ace775d9ab7075c 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -4,29 +4,34 @@ import ( "sync" "time" + net "github.com/libp2p/go-libp2p-net" pstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" ) const ActivationThresh = 4 +type observation struct { + seenTime time.Time + connDirection net.Direction +} + // ObservedAddr is an entry for an address reported by our peers. // We only use addresses that: // - have been observed at least 4 times in last 1h. (counter symmetric nats) // - have been observed at least once recently (1h), because our position in the // network, or network port mapppings, may have changed. type ObservedAddr struct { - Addr ma.Multiaddr - SeenBy map[string]time.Time - LastSeen time.Time - Activated bool + Addr ma.Multiaddr + SeenBy map[string]observation // peer(observer) address -> observation info + LastSeen time.Time } -func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool { +func (oa *ObservedAddr) activated(ttl time.Duration) bool { // cleanup SeenBy set now := time.Now() - for k, t := range oa.SeenBy { - if now.Sub(t) > ttl*ActivationThresh { + for k, ob := range oa.SeenBy { + if now.Sub(ob.seenTime) > ttl*ActivationThresh { delete(oa.SeenBy, k) } } @@ -41,60 +46,75 @@ func (oa *ObservedAddr) TryActivate(ttl time.Duration) bool { type ObservedAddrSet struct { sync.Mutex // guards whole datastruct. - addrs map[string]*ObservedAddr + // local(internal) address -> list of observed(external) addresses + addrs map[string][]*ObservedAddr ttl time.Duration } -func (oas *ObservedAddrSet) Addrs() []ma.Multiaddr { +// Addrs return all activated observed addresses +func (oas *ObservedAddrSet) Addrs() (addrs []ma.Multiaddr) { oas.Lock() defer oas.Unlock() // for zero-value. - if oas.addrs == nil { + if len(oas.addrs) == 0 { return nil } now := time.Now() - addrs := make([]ma.Multiaddr, 0, len(oas.addrs)) - for s, a := range oas.addrs { - // remove timed out addresses. - if now.Sub(a.LastSeen) > oas.ttl { - delete(oas.addrs, s) - continue - } - - if a.Activated || a.TryActivate(oas.ttl) { - addrs = append(addrs, a.Addr) + for local, observedAddrs := range oas.addrs { + filteredAddrs := make([]*ObservedAddr, 0, len(observedAddrs)) + for _, a := range observedAddrs { + // leave only alive observed addresses + if now.Sub(a.LastSeen) <= oas.ttl { + filteredAddrs = append(filteredAddrs, a) + if a.activated(oas.ttl) { + addrs = append(addrs, a.Addr) + } + } } + oas.addrs[local] = filteredAddrs } return addrs } -func (oas *ObservedAddrSet) Add(addr ma.Multiaddr, observer ma.Multiaddr) { +func (oas *ObservedAddrSet) Add(observed, local, observer ma.Multiaddr, + direction net.Direction) { + oas.Lock() defer oas.Unlock() // for zero-value. if oas.addrs == nil { - oas.addrs = make(map[string]*ObservedAddr) + oas.addrs = make(map[string][]*ObservedAddr) oas.ttl = pstore.OwnObservedAddrTTL } - s := addr.String() - oa, found := oas.addrs[s] + now := time.Now() + observerString := observerGroup(observer) + localString := local.String() + ob := observation{ + seenTime: now, + connDirection: direction, + } - // first time seeing address. - if !found { - oa = &ObservedAddr{ - Addr: addr, - SeenBy: make(map[string]time.Time), + observedAddrs := oas.addrs[localString] + // check if observed address seen yet, if so, update it + for i, previousObserved := range observedAddrs { + if previousObserved.Addr.Equal(observed) { + observedAddrs[i].SeenBy[observerString] = ob + observedAddrs[i].LastSeen = now + return } - oas.addrs[s] = oa } - - // mark the observer - oa.SeenBy[observerGroup(observer)] = time.Now() - oa.LastSeen = time.Now() + // observed address not seen yet, append it + oas.addrs[localString] = append(oas.addrs[localString], &ObservedAddr{ + Addr: observed, + SeenBy: map[string]observation{ + observerString: ob, + }, + LastSeen: now, + }) } // observerGroup is a function that determines what part of diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index acf3d30d007e7e7ff717077c4b3f6e2264725e1f..7291326218637926bfccfc1781391a9598d62dc1 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + net "github.com/libp2p/go-libp2p-net" ma "github.com/multiformats/go-multiaddr" ) @@ -49,15 +50,22 @@ func TestObsAddrSet(t *testing.T) { b4 := m("/ip4/1.2.3.9/tcp/1237") b5 := m("/ip4/1.2.3.10/tcp/1237") - oas := ObservedAddrSet{} + oas := &ObservedAddrSet{} if !addrsMarch(oas.Addrs(), nil) { t.Error("addrs should be empty") } - oas.Add(a1, a4) - oas.Add(a2, a4) - oas.Add(a3, a4) + add := func(oas *ObservedAddrSet, observed, observer ma.Multiaddr) { + dummyLocal := m("/ip4/127.0.0.1/tcp/10086") + dummyDirection := net.DirOutbound + + oas.Add(observed, dummyLocal, observer, dummyDirection) + } + + add(oas, a1, a4) + add(oas, a2, a4) + add(oas, a3, a4) // these are all different so we should not yet get them. if !addrsMarch(oas.Addrs(), nil) { @@ -65,39 +73,39 @@ func TestObsAddrSet(t *testing.T) { } // same observer, so should not yet get them. - oas.Add(a1, a4) - oas.Add(a2, a4) - oas.Add(a3, a4) + add(oas, a1, a4) + add(oas, a2, a4) + add(oas, a3, a4) if !addrsMarch(oas.Addrs(), nil) { t.Error("addrs should _still_ be empty (same obs)") } // different observer, but same observer group. - oas.Add(a1, a5) - oas.Add(a2, a5) - oas.Add(a3, a5) + add(oas, a1, a5) + add(oas, a2, a5) + add(oas, a3, a5) if !addrsMarch(oas.Addrs(), nil) { t.Error("addrs should _still_ be empty (same obs group)") } - oas.Add(a1, b1) - oas.Add(a1, b2) - oas.Add(a1, b3) + add(oas, a1, b1) + add(oas, a1, b2) + add(oas, a1, b3) if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1}) { t.Error("addrs should only have a1") } - oas.Add(a2, a5) - oas.Add(a1, a5) - oas.Add(a1, a5) - oas.Add(a2, b1) - oas.Add(a1, b1) - oas.Add(a1, b1) - oas.Add(a2, b2) - oas.Add(a1, b2) - oas.Add(a1, b2) - oas.Add(a2, b4) - oas.Add(a2, b5) + add(oas, a2, a5) + add(oas, a1, a5) + add(oas, a1, a5) + add(oas, a2, b1) + add(oas, a1, b1) + add(oas, a1, b1) + add(oas, a2, b2) + add(oas, a1, b2) + add(oas, a1, b2) + add(oas, a2, b4) + add(oas, a2, b5) if !addrsMarch(oas.Addrs(), []ma.Multiaddr{a1, a2}) { t.Error("addrs should only have a1, a2") }