Commit 0c1fe86b authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

introducing p2p pkg

I think it's time to move a lot of the peer-to-peer networking
but-not-ipfs-specific things into its own package: p2p.
This could in the future be split off into its own library.
The first thing to go is the peer.
parents
package peer
import (
"sync"
"time"
)
// LatencyEWMASmooting governs the decay of the EWMA (the speed
// at which it changes). This must be a normalized (0-1) value.
// 1 is 100% change, 0 is no change.
var LatencyEWMASmoothing = 0.1
// Metrics is just an object that tracks metrics
// across a set of peers.
type Metrics interface {
// RecordLatency records a new latency measurement
RecordLatency(ID, time.Duration)
// LatencyEWMA returns an exponentially-weighted moving avg.
// of all measurements of a peer's latency.
LatencyEWMA(ID) time.Duration
}
type metrics struct {
latmap map[ID]time.Duration
latmu sync.RWMutex
}
func NewMetrics() Metrics {
return &metrics{
latmap: make(map[ID]time.Duration),
}
}
// RecordLatency records a new latency measurement
func (m *metrics) RecordLatency(p ID, next time.Duration) {
nextf := float64(next)
s := LatencyEWMASmoothing
if s > 1 || s < 0 {
s = 0.1 // ignore the knob. it's broken. look, it jiggles.
}
m.latmu.Lock()
ewma, found := m.latmap[p]
ewmaf := float64(ewma)
if !found {
m.latmap[p] = next // when no data, just take it as the mean.
} else {
nextf = ((1.0 - s) * ewmaf) + (s * nextf)
m.latmap[p] = time.Duration(nextf)
}
m.latmu.Unlock()
}
// LatencyEWMA returns an exponentially-weighted moving avg.
// of all measurements of a peer's latency.
func (m *metrics) LatencyEWMA(p ID) time.Duration {
m.latmu.RLock()
lat := m.latmap[p]
m.latmu.RUnlock()
return time.Duration(lat)
}
package peer_test
import (
"fmt"
"math/rand"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/p2p/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func TestLatencyEWMAFun(t *testing.T) {
t.Skip("run it for fun")
m := peer.NewMetrics()
id, err := testutil.RandPeerID()
if err != nil {
t.Fatal(err)
}
mu := 100.0
sig := 10.0
next := func() time.Duration {
mu = (rand.NormFloat64() * sig) + mu
return time.Duration(mu)
}
print := func() {
fmt.Printf("%3.f %3.f --> %d\n", sig, mu, m.LatencyEWMA(id))
}
for {
select {
case <-time.After(200 * time.Millisecond):
m.RecordLatency(id, next())
print()
}
}
}
// package peer implements an object used to represent peers in the ipfs network.
package peer
import (
"encoding/hex"
"fmt"
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
ic "github.com/jbenet/go-ipfs/crypto"
u "github.com/jbenet/go-ipfs/util"
)
var log = u.Logger("peer")
// ID represents the identity of a peer.
type ID string
// Pretty returns a b58-encoded string of the ID
func (id ID) Pretty() string {
return IDB58Encode(id)
}
func (id ID) Loggable() map[string]interface{} {
return map[string]interface{}{
"peerID": id.Pretty(),
}
}
// String prints out the peer.
//
// TODO(brian): ensure correctness at ID generation and
// enforce this by only exposing functions that generate
// IDs safely. Then any peer.ID type found in the
// codebase is known to be correct.
func (id ID) String() string {
pid := id.Pretty()
maxRunes := 6
if len(pid) < maxRunes {
maxRunes = len(pid)
}
return fmt.Sprintf("<peer.ID %s>", pid[:maxRunes])
}
// MatchesPrivateKey tests whether this ID was derived from sk
func (id ID) MatchesPrivateKey(sk ic.PrivKey) bool {
return id.MatchesPublicKey(sk.GetPublic())
}
// MatchesPublicKey tests whether this ID was derived from pk
func (id ID) MatchesPublicKey(pk ic.PubKey) bool {
oid, err := IDFromPublicKey(pk)
if err != nil {
return false
}
return oid == id
}
// IDFromString cast a string to ID type, and validate
// the id to make sure it is a multihash.
func IDFromString(s string) (ID, error) {
if _, err := mh.Cast([]byte(s)); err != nil {
return ID(""), err
}
return ID(s), nil
}
// IDFromBytes cast a string to ID type, and validate
// the id to make sure it is a multihash.
func IDFromBytes(b []byte) (ID, error) {
if _, err := mh.Cast(b); err != nil {
return ID(""), err
}
return ID(b), nil
}
// IDB58Decode returns a b58-decoded Peer
func IDB58Decode(s string) (ID, error) {
m, err := mh.FromB58String(s)
if err != nil {
return "", err
}
return ID(m), err
}
// IDB58Encode returns b58-encoded string
func IDB58Encode(id ID) string {
return b58.Encode([]byte(id))
}
// IDHexDecode returns a b58-decoded Peer
func IDHexDecode(s string) (ID, error) {
m, err := mh.FromHexString(s)
if err != nil {
return "", err
}
return ID(m), err
}
// IDHexEncode returns b58-encoded string
func IDHexEncode(id ID) string {
return hex.EncodeToString([]byte(id))
}
// IDFromPublicKey returns the Peer ID corresponding to pk
func IDFromPublicKey(pk ic.PubKey) (ID, error) {
b, err := pk.Bytes()
if err != nil {
return "", err
}
hash := u.Hash(b)
return ID(hash), nil
}
// IDFromPrivateKey returns the Peer ID corresponding to sk
func IDFromPrivateKey(sk ic.PrivKey) (ID, error) {
return IDFromPublicKey(sk.GetPublic())
}
// Map maps a Peer ID to a struct.
type Set map[ID]struct{}
// PeerInfo is a small struct used to pass around a peer with
// a set of addresses (and later, keys?). This is not meant to be
// a complete view of the system, but rather to model updates to
// the peerstore. It is used by things like the routing system.
type PeerInfo struct {
ID ID
Addrs []ma.Multiaddr
}
package peer_test
import (
"encoding/base64"
"fmt"
"strings"
"testing"
ic "github.com/jbenet/go-ipfs/crypto"
. "github.com/jbenet/go-ipfs/peer"
u "github.com/jbenet/go-ipfs/util"
tu "github.com/jbenet/go-ipfs/util/testutil"
b58 "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-base58"
)
var gen1 keyset // generated
var gen2 keyset // generated
var man keyset // manual
func init() {
if err := gen1.generate(); err != nil {
panic(err)
}
if err := gen2.generate(); err != nil {
panic(err)
}
skManBytes = strings.Replace(skManBytes, "\n", "", -1)
if err := man.load(hpkpMan, skManBytes); err != nil {
panic(err)
}
}
type keyset struct {
sk ic.PrivKey
pk ic.PubKey
hpk string
hpkp string
}
func (ks *keyset) generate() error {
var err error
ks.sk, ks.pk, err = tu.RandKeyPair(512)
if err != nil {
return err
}
bpk, err := ks.pk.Bytes()
if err != nil {
return err
}
ks.hpk = string(u.Hash(bpk))
ks.hpkp = b58.Encode([]byte(ks.hpk))
return nil
}
func (ks *keyset) load(hpkp, skBytesStr string) error {
skBytes, err := base64.StdEncoding.DecodeString(skBytesStr)
if err != nil {
return err
}
ks.sk, err = ic.UnmarshalPrivateKey(skBytes)
if err != nil {
return err
}
ks.pk = ks.sk.GetPublic()
bpk, err := ks.pk.Bytes()
if err != nil {
return err
}
ks.hpk = string(u.Hash(bpk))
ks.hpkp = b58.Encode([]byte(ks.hpk))
if ks.hpkp != hpkp {
return fmt.Errorf("hpkp doesn't match key. %s", hpkp)
}
return nil
}
func TestIDMatchesPublicKey(t *testing.T) {
test := func(ks keyset) {
p1, err := IDB58Decode(ks.hpkp)
if err != nil {
t.Fatal(err)
}
if ks.hpk != string(p1) {
t.Error("p1 and hpk differ")
}
if !p1.MatchesPublicKey(ks.pk) {
t.Fatal("p1 does not match pk")
}
p2, err := IDFromPublicKey(ks.pk)
if err != nil {
t.Fatal(err)
}
if p1 != p2 {
t.Error("p1 and p2 differ", p1.Pretty(), p2.Pretty())
}
if p2.Pretty() != ks.hpkp {
t.Error("hpkp and p2.Pretty differ", ks.hpkp, p2.Pretty())
}
}
test(gen1)
test(gen2)
test(man)
}
func TestIDMatchesPrivateKey(t *testing.T) {
test := func(ks keyset) {
p1, err := IDB58Decode(ks.hpkp)
if err != nil {
t.Fatal(err)
}
if ks.hpk != string(p1) {
t.Error("p1 and hpk differ")
}
if !p1.MatchesPrivateKey(ks.sk) {
t.Fatal("p1 does not match sk")
}
p2, err := IDFromPrivateKey(ks.sk)
if err != nil {
t.Fatal(err)
}
if p1 != p2 {
t.Error("p1 and p2 differ", p1.Pretty(), p2.Pretty())
}
}
test(gen1)
test(gen2)
test(man)
}
var hpkpMan = `QmRK3JgmVEGiewxWbhpXLJyjWuGuLeSTMTndA1coMHEy5o`
var skManBytes = `
CAAS4AQwggJcAgEAAoGBAL7w+Wc4VhZhCdM/+Hccg5Nrf4q9NXWwJylbSrXz/unFS24wyk6pEk0zi3W
7li+vSNVO+NtJQw9qGNAMtQKjVTP+3Vt/jfQRnQM3s6awojtjueEWuLYVt62z7mofOhCtj+VwIdZNBo
/EkLZ0ETfcvN5LVtLYa8JkXybnOPsLvK+PAgMBAAECgYBdk09HDM7zzL657uHfzfOVrdslrTCj6p5mo
DzvCxLkkjIzYGnlPuqfNyGjozkpSWgSUc+X+EGLLl3WqEOVdWJtbM61fewEHlRTM5JzScvwrJ39t7o6
CCAjKA0cBWBd6UWgbN/t53RoWvh9HrA2AW5YrT0ZiAgKe9y7EMUaENVJ8QJBAPhpdmb4ZL4Fkm4OKia
NEcjzn6mGTlZtef7K/0oRC9+2JkQnCuf6HBpaRhJoCJYg7DW8ZY+AV6xClKrgjBOfERMCQQDExhnzu2
dsQ9k8QChBlpHO0TRbZBiQfC70oU31kM1AeLseZRmrxv9Yxzdl8D693NNWS2JbKOXl0kMHHcuGQLMVA
kBZ7WvkmPV3aPL6jnwp2pXepntdVnaTiSxJ1dkXShZ/VSSDNZMYKY306EtHrIu3NZHtXhdyHKcggDXr
qkBrdgErAkAlpGPojUwemOggr4FD8sLX1ot2hDJyyV7OK2FXfajWEYJyMRL1Gm9Uk1+Un53RAkJneqp
JGAzKpyttXBTIDO51AkEA98KTiROMnnU8Y6Mgcvr68/SMIsvCYMt9/mtwSBGgl80VaTQ5Hpaktl6Xbh
VUt5Wv0tRxlXZiViCGCD1EtrrwTw==
`
package peer
import (
"errors"
"sync"
ic "github.com/jbenet/go-ipfs/crypto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
// Peerstore provides a threadsafe store of Peer related
// information.
type Peerstore interface {
KeyBook
AddressBook
Metrics
// Peers returns a list of all peer.IDs in this Peerstore
Peers() []ID
// PeerInfo returns a peer.PeerInfo struct for given peer.ID.
// This is a small slice of the information Peerstore has on
// that peer, useful to other services.
PeerInfo(ID) PeerInfo
// Get/Put is a simple registry for other peer-related key/value pairs.
// if we find something we use often, it should become its own set of
// methods. this is a last resort.
Get(id ID, key string) (interface{}, error)
Put(id ID, key string, val interface{}) error
}
// AddressBook tracks the addresses of Peers
type AddressBook interface {
Addresses(ID) []ma.Multiaddr
AddAddress(ID, ma.Multiaddr)
AddAddresses(ID, []ma.Multiaddr)
}
type addressMap map[string]ma.Multiaddr
type addressbook struct {
addrs map[ID]addressMap
sync.RWMutex
}
func newAddressbook() *addressbook {
return &addressbook{addrs: map[ID]addressMap{}}
}
func (ab *addressbook) Peers() []ID {
ab.RLock()
ps := make([]ID, 0, len(ab.addrs))
for p := range ab.addrs {
ps = append(ps, p)
}
ab.RUnlock()
return ps
}
func (ab *addressbook) Addresses(p ID) []ma.Multiaddr {
ab.RLock()
defer ab.RUnlock()
maddrs, found := ab.addrs[p]
if !found {
return nil
}
maddrs2 := make([]ma.Multiaddr, 0, len(maddrs))
for _, m := range maddrs {
maddrs2 = append(maddrs2, m)
}
return maddrs2
}
func (ab *addressbook) AddAddress(p ID, m ma.Multiaddr) {
ab.Lock()
defer ab.Unlock()
_, found := ab.addrs[p]
if !found {
ab.addrs[p] = addressMap{}
}
ab.addrs[p][m.String()] = m
}
func (ab *addressbook) AddAddresses(p ID, ms []ma.Multiaddr) {
ab.Lock()
defer ab.Unlock()
for _, m := range ms {
_, found := ab.addrs[p]
if !found {
ab.addrs[p] = addressMap{}
}
ab.addrs[p][m.String()] = m
}
}
// KeyBook tracks the Public keys of Peers.
type KeyBook interface {
PubKey(ID) ic.PubKey
AddPubKey(ID, ic.PubKey) error
PrivKey(ID) ic.PrivKey
AddPrivKey(ID, ic.PrivKey) error
}
type keybook struct {
pks map[ID]ic.PubKey
sks map[ID]ic.PrivKey
sync.RWMutex // same lock. wont happen a ton.
}
func newKeybook() *keybook {
return &keybook{
pks: map[ID]ic.PubKey{},
sks: map[ID]ic.PrivKey{},
}
}
func (kb *keybook) Peers() []ID {
kb.RLock()
ps := make([]ID, 0, len(kb.pks)+len(kb.sks))
for p := range kb.pks {
ps = append(ps, p)
}
for p := range kb.sks {
if _, found := kb.pks[p]; !found {
ps = append(ps, p)
}
}
kb.RUnlock()
return ps
}
func (kb *keybook) PubKey(p ID) ic.PubKey {
kb.RLock()
pk := kb.pks[p]
kb.RUnlock()
return pk
}
func (kb *keybook) AddPubKey(p ID, pk ic.PubKey) error {
// check it's correct first
if !p.MatchesPublicKey(pk) {
return errors.New("ID does not match PublicKey")
}
kb.Lock()
kb.pks[p] = pk
kb.Unlock()
return nil
}
func (kb *keybook) PrivKey(p ID) ic.PrivKey {
kb.RLock()
sk := kb.sks[p]
kb.RUnlock()
return sk
}
func (kb *keybook) AddPrivKey(p ID, sk ic.PrivKey) error {
if sk == nil {
return errors.New("sk is nil (PrivKey)")
}
// check it's correct first
if !p.MatchesPrivateKey(sk) {
return errors.New("ID does not match PrivateKey")
}
kb.Lock()
kb.sks[p] = sk
kb.Unlock()
return nil
}
type peerstore struct {
keybook
addressbook
metrics
// store other data, like versions
ds ds.ThreadSafeDatastore
}
// NewPeerstore creates a threadsafe collection of peers.
func NewPeerstore() Peerstore {
return &peerstore{
keybook: *newKeybook(),
addressbook: *newAddressbook(),
metrics: *(NewMetrics()).(*metrics),
ds: dssync.MutexWrap(ds.NewMapDatastore()),
}
}
func (ps *peerstore) Put(p ID, key string, val interface{}) error {
dsk := ds.NewKey(string(p) + "/" + key)
return ps.ds.Put(dsk, val)
}
func (ps *peerstore) Get(p ID, key string) (interface{}, error) {
dsk := ds.NewKey(string(p) + "/" + key)
return ps.ds.Get(dsk)
}
func (ps *peerstore) Peers() []ID {
set := map[ID]struct{}{}
for _, p := range ps.keybook.Peers() {
set[p] = struct{}{}
}
for _, p := range ps.addressbook.Peers() {
set[p] = struct{}{}
}
pps := make([]ID, 0, len(set))
for p := range set {
pps = append(pps, p)
}
return pps
}
func (ps *peerstore) PeerInfo(p ID) PeerInfo {
return PeerInfo{
ID: p,
Addrs: ps.addressbook.Addresses(p),
}
}
func PeerInfos(ps Peerstore, peers []ID) []PeerInfo {
pi := make([]PeerInfo, len(peers))
for i, p := range peers {
pi[i] = ps.PeerInfo(p)
}
return pi
}
func PeerInfoIDs(pis []PeerInfo) []ID {
ps := make([]ID, len(pis))
for i, pi := range pis {
ps[i] = pi.ID
}
return ps
}
package peer
import (
"testing"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
func IDS(t *testing.T, ids string) ID {
id, err := IDB58Decode(ids)
if err != nil {
t.Fatal(err)
}
return id
}
func MA(t *testing.T, m string) ma.Multiaddr {
maddr, err := ma.NewMultiaddr(m)
if err != nil {
t.Fatal(err)
}
return maddr
}
func TestAddresses(t *testing.T) {
ps := NewPeerstore()
id1 := IDS(t, "QmcNstKuwBBoVTpSCSDrwzjgrRcaYXK833Psuz2EMHwyQN")
id2 := IDS(t, "QmRmPL3FDZKE3Qiwv1RosLdwdvbvg17b2hB39QPScgWKKZ")
id3 := IDS(t, "QmPhi7vBsChP7sjRoZGgg7bcKqF6MmCcQwvRbDte8aJ6Kn")
ma11 := MA(t, "/ip4/1.2.3.1/tcp/1111")
ma21 := MA(t, "/ip4/1.2.3.2/tcp/1111")
ma22 := MA(t, "/ip4/1.2.3.2/tcp/2222")
ma31 := MA(t, "/ip4/1.2.3.3/tcp/1111")
ma32 := MA(t, "/ip4/1.2.3.3/tcp/2222")
ma33 := MA(t, "/ip4/1.2.3.3/tcp/3333")
ps.AddAddress(id1, ma11)
ps.AddAddress(id2, ma21)
ps.AddAddress(id2, ma22)
ps.AddAddress(id3, ma31)
ps.AddAddress(id3, ma32)
ps.AddAddress(id3, ma33)
test := func(exp, act []ma.Multiaddr) {
if len(exp) != len(act) {
t.Fatal("lengths not the same")
}
for _, a := range exp {
found := false
for _, b := range act {
if a.Equal(b) {
found = true
break
}
}
if !found {
t.Fatal("expected address %s not found", a)
}
}
}
// test the Addresses return value
test([]ma.Multiaddr{ma11}, ps.Addresses(id1))
test([]ma.Multiaddr{ma21, ma22}, ps.Addresses(id2))
test([]ma.Multiaddr{ma31, ma32, ma33}, ps.Addresses(id3))
// test also the PeerInfo return
test([]ma.Multiaddr{ma11}, ps.PeerInfo(id1).Addrs)
test([]ma.Multiaddr{ma21, ma22}, ps.PeerInfo(id2).Addrs)
test([]ma.Multiaddr{ma31, ma32, ma33}, ps.PeerInfo(id3).Addrs)
}
package queue
import (
"container/heap"
"math/big"
"sync"
peer "github.com/jbenet/go-ipfs/p2p/peer"
ks "github.com/jbenet/go-ipfs/routing/keyspace"
u "github.com/jbenet/go-ipfs/util"
)
// peerMetric tracks a peer and its distance to something else.
type peerMetric struct {
// the peer
peer peer.ID
// big.Int for XOR metric
metric *big.Int
}
// peerMetricHeap implements a heap of peerDistances
type peerMetricHeap []*peerMetric
func (ph peerMetricHeap) Len() int {
return len(ph)
}
func (ph peerMetricHeap) Less(i, j int) bool {
return -1 == ph[i].metric.Cmp(ph[j].metric)
}
func (ph peerMetricHeap) Swap(i, j int) {
ph[i], ph[j] = ph[j], ph[i]
}
func (ph *peerMetricHeap) Push(x interface{}) {
item := x.(*peerMetric)
*ph = append(*ph, item)
}
func (ph *peerMetricHeap) Pop() interface{} {
old := *ph
n := len(old)
item := old[n-1]
*ph = old[0 : n-1]
return item
}
// distancePQ implements heap.Interface and PeerQueue
type distancePQ struct {
// from is the Key this PQ measures against
from ks.Key
// heap is a heap of peerDistance items
heap peerMetricHeap
sync.RWMutex
}
func (pq *distancePQ) Len() int {
pq.Lock()
defer pq.Unlock()
return len(pq.heap)
}
func (pq *distancePQ) Enqueue(p peer.ID) {
pq.Lock()
defer pq.Unlock()
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
heap.Push(&pq.heap, &peerMetric{
peer: p,
metric: distance,
})
}
func (pq *distancePQ) Dequeue() peer.ID {
pq.Lock()
defer pq.Unlock()
if len(pq.heap) < 1 {
panic("called Dequeue on an empty PeerQueue")
// will panic internally anyway, but we can help debug here
}
o := heap.Pop(&pq.heap)
p := o.(*peerMetric)
return p.peer
}
// NewXORDistancePQ returns a PeerQueue which maintains its peers sorted
// in terms of their distances to each other in an XORKeySpace (i.e. using
// XOR as a metric of distance).
func NewXORDistancePQ(fromKey u.Key) PeerQueue {
return &distancePQ{
from: ks.XORKeySpace.Key([]byte(fromKey)),
heap: peerMetricHeap{},
}
}
package queue
import peer "github.com/jbenet/go-ipfs/p2p/peer"
// PeerQueue maintains a set of peers ordered according to a metric.
// Implementations of PeerQueue could order peers based on distances along
// a KeySpace, latency measurements, trustworthiness, reputation, etc.
type PeerQueue interface {
// Len returns the number of items in PeerQueue
Len() int
// Enqueue adds this node to the queue.
Enqueue(peer.ID)
// Dequeue retrieves the highest (smallest int) priority node
Dequeue() peer.ID
}
package queue
import (
"fmt"
"sync"
"testing"
"time"
peer "github.com/jbenet/go-ipfs/p2p/peer"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
func TestQueue(t *testing.T) {
p1 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
p2 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a32") // these aren't valid, because need to hex-decode.
p3 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") // these aren't valid, because need to hex-decode.
p4 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a34") // these aren't valid, because need to hex-decode.
p5 := peer.ID("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31") // these aren't valid, because need to hex-decode.
// but they work.
// these are the peer.IDs' XORKeySpace Key values:
// [228 47 151 130 156 102 222 232 218 31 132 94 170 208 80 253 120 103 55 35 91 237 48 157 81 245 57 247 66 150 9 40]
// [26 249 85 75 54 49 25 30 21 86 117 62 85 145 48 175 155 194 210 216 58 14 241 143 28 209 129 144 122 28 163 6]
// [78 135 26 216 178 181 224 181 234 117 2 248 152 115 255 103 244 34 4 152 193 88 9 225 8 127 216 158 226 8 236 246]
// [125 135 124 6 226 160 101 94 192 57 39 12 18 79 121 140 190 154 147 55 44 83 101 151 63 255 94 179 51 203 241 51]
pq := NewXORDistancePQ(u.Key("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
pq.Enqueue(p3)
pq.Enqueue(p1)
pq.Enqueue(p2)
pq.Enqueue(p4)
pq.Enqueue(p5)
pq.Enqueue(p1)
// should come out as: p1, p4, p3, p2
if d := pq.Dequeue(); d != p1 && d != p5 {
t.Error("ordering failed")
}
if d := pq.Dequeue(); d != p1 && d != p5 {
t.Error("ordering failed")
}
if d := pq.Dequeue(); d != p1 && d != p5 {
t.Error("ordering failed")
}
if pq.Dequeue() != p4 {
t.Error("ordering failed")
}
if pq.Dequeue() != p3 {
t.Error("ordering failed")
}
if pq.Dequeue() != p2 {
t.Error("ordering failed")
}
}
func newPeerTime(t time.Time) peer.ID {
s := fmt.Sprintf("hmmm time: %v", t)
h := u.Hash([]byte(s))
return peer.ID(h)
}
func TestSyncQueue(t *testing.T) {
tickT := time.Microsecond * 50
max := 5000
consumerN := 10
countsIn := make([]int, consumerN*2)
countsOut := make([]int, consumerN)
if testing.Short() {
max = 1000
}
ctx := context.Background()
pq := NewXORDistancePQ(u.Key("11140beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a31"))
cq := NewChanQueue(ctx, pq)
wg := sync.WaitGroup{}
produce := func(p int) {
defer wg.Done()
tick := time.Tick(tickT)
for i := 0; i < max; i++ {
select {
case tim := <-tick:
countsIn[p]++
cq.EnqChan <- newPeerTime(tim)
case <-ctx.Done():
return
}
}
}
consume := func(c int) {
defer wg.Done()
for {
select {
case <-cq.DeqChan:
countsOut[c]++
if countsOut[c] >= max*2 {
return
}
case <-ctx.Done():
return
}
}
}
// make n * 2 producers and n consumers
for i := 0; i < consumerN; i++ {
wg.Add(3)
go produce(i)
go produce(consumerN + i)
go consume(i)
}
wg.Wait()
sum := func(ns []int) int {
total := 0
for _, n := range ns {
total += n
}
return total
}
if sum(countsIn) != sum(countsOut) {
t.Errorf("didnt get all of them out: %d/%d", sum(countsOut), sum(countsIn))
}
}
package queue
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer "github.com/jbenet/go-ipfs/p2p/peer"
)
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
Queue PeerQueue
EnqChan chan<- peer.ID
DeqChan <-chan peer.ID
}
// NewChanQueue creates a ChanQueue by wrapping pq.
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
cq := &ChanQueue{Queue: pq}
cq.process(ctx)
return cq
}
func (cq *ChanQueue) process(ctx context.Context) {
// construct the channels here to be able to use them bidirectionally
enqChan := make(chan peer.ID)
deqChan := make(chan peer.ID)
cq.EnqChan = enqChan
cq.DeqChan = deqChan
go func() {
defer close(deqChan)
var next peer.ID
var item peer.ID
var more bool
for {
if cq.Queue.Len() == 0 {
select {
case next, more = <-enqChan:
if !more {
return
}
case <-ctx.Done():
return
}
} else {
next = cq.Queue.Dequeue()
}
select {
case item, more = <-enqChan:
if !more {
return
}
cq.Queue.Enqueue(item)
cq.Queue.Enqueue(next)
next = ""
case deqChan <- next:
next = ""
case <-ctx.Done():
return
}
}
}()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment