Commit 51fd99e3 authored by Jeromy's avatar Jeromy
Browse files

extract from 0.4.0

parent 5a0162c7
package log
import (
"encoding/json"
"io"
logging "github.com/whyrusleeping/go-logging"
)
// PoliteJSONFormatter marshals entries into JSON encoded slices (without
// overwriting user-provided keys). How polite of it!
type PoliteJSONFormatter struct{}
func (f *PoliteJSONFormatter) Format(calldepth int, r *logging.Record, w io.Writer) error {
entry := make(map[string]interface{})
entry["id"] = r.Id
entry["level"] = r.Level
entry["time"] = r.Time
entry["module"] = r.Module
entry["message"] = r.Message()
err := json.NewEncoder(w).Encode(entry)
if err != nil {
return err
}
w.Write([]byte{'\n'})
return nil
}
package log
import (
"fmt"
"io"
"sync"
)
var MaxWriterBuffer = 512 * 1024
var log = Logger("eventlog")
type MirrorWriter struct {
active bool
activelk sync.Mutex
// channel for incoming writers
writerAdd chan *writerAdd
// slices of writer/sync-channel pairs
writers []*bufWriter
// synchronization channel for incoming writes
msgSync chan []byte
}
type writerSync struct {
w io.WriteCloser
br chan []byte
}
func NewMirrorWriter() *MirrorWriter {
mw := &MirrorWriter{
msgSync: make(chan []byte, 64), // sufficiently large buffer to avoid callers waiting
writerAdd: make(chan *writerAdd),
}
go mw.logRoutine()
return mw
}
func (mw *MirrorWriter) Write(b []byte) (int, error) {
mycopy := make([]byte, len(b))
copy(mycopy, b)
mw.msgSync <- mycopy
return len(b), nil
}
func (mw *MirrorWriter) Close() error {
// it is up to the caller to ensure that write is not called during or
// after close is called.
close(mw.msgSync)
return nil
}
func (mw *MirrorWriter) doClose() {
for _, w := range mw.writers {
w.writer.Close()
}
}
func (mw *MirrorWriter) logRoutine() {
// rebind to avoid races on nilling out struct fields
msgSync := mw.msgSync
writerAdd := mw.writerAdd
defer mw.doClose()
for {
select {
case b, ok := <-msgSync:
if !ok {
return
}
// write to all writers
dropped := mw.broadcastMessage(b)
// consolidate the slice
if dropped {
mw.clearDeadWriters()
}
case wa := <-writerAdd:
mw.writers = append(mw.writers, newBufWriter(wa.w))
mw.activelk.Lock()
mw.active = true
mw.activelk.Unlock()
close(wa.done)
}
}
}
// broadcastMessage sends the given message to every writer
// if any writer is killed during the send, 'true' is returned
func (mw *MirrorWriter) broadcastMessage(b []byte) bool {
var dropped bool
for i, w := range mw.writers {
_, err := w.Write(b)
if err != nil {
mw.writers[i] = nil
dropped = true
}
}
return dropped
}
func (mw *MirrorWriter) clearDeadWriters() {
writers := mw.writers
mw.writers = nil
for _, w := range writers {
if w != nil {
mw.writers = append(mw.writers, w)
}
}
if len(mw.writers) == 0 {
mw.activelk.Lock()
mw.active = false
mw.activelk.Unlock()
}
}
type writerAdd struct {
w io.WriteCloser
done chan struct{}
}
func (mw *MirrorWriter) AddWriter(w io.WriteCloser) {
wa := &writerAdd{
w: w,
done: make(chan struct{}),
}
mw.writerAdd <- wa
<-wa.done
}
func (mw *MirrorWriter) Active() (active bool) {
mw.activelk.Lock()
active = mw.active
mw.activelk.Unlock()
return
}
func newBufWriter(w io.WriteCloser) *bufWriter {
bw := &bufWriter{
writer: w,
incoming: make(chan []byte, 1),
}
go bw.loop()
return bw
}
type bufWriter struct {
writer io.WriteCloser
incoming chan []byte
deathLock sync.Mutex
dead bool
}
var errDeadWriter = fmt.Errorf("writer is dead")
func (bw *bufWriter) Write(b []byte) (int, error) {
bw.deathLock.Lock()
dead := bw.dead
bw.deathLock.Unlock()
if dead {
if bw.incoming != nil {
close(bw.incoming)
bw.incoming = nil
}
return 0, errDeadWriter
}
bw.incoming <- b
return len(b), nil
}
func (bw *bufWriter) die() {
bw.deathLock.Lock()
bw.dead = true
bw.writer.Close()
bw.deathLock.Unlock()
}
func (bw *bufWriter) loop() {
bufsize := 0
bufBase := make([][]byte, 0, 16) // some initial memory
buffered := bufBase
nextCh := make(chan []byte)
var nextMsg []byte
go func() {
for b := range nextCh {
_, err := bw.writer.Write(b)
if err != nil {
log.Info("eventlog write error: %s", err)
bw.die()
return
}
}
}()
// collect and buffer messages
incoming := bw.incoming
for {
if nextMsg == nil || nextCh == nil {
// nextCh == nil implies we are 'dead' and draining the incoming channel
// until the caller notices and closes it for us
select {
case b, ok := <-incoming:
if !ok {
return
}
nextMsg = b
}
}
select {
case b, ok := <-incoming:
if !ok {
return
}
bufsize += len(b)
buffered = append(buffered, b)
if bufsize > MaxWriterBuffer {
// if we have too many messages buffered, kill the writer
bw.die()
close(nextCh)
nextCh = nil
// explicity keep going here to drain incoming
}
case nextCh <- nextMsg:
nextMsg = nil
if len(buffered) > 0 {
nextMsg = buffered[0]
buffered = buffered[1:]
bufsize -= len(nextMsg)
}
if len(buffered) == 0 {
// reset slice position
buffered = bufBase[:0]
}
}
}
}
package log
import (
"fmt"
"hash/fnv"
"io"
"sync"
"testing"
"time"
randbo "github.com/dustin/randbo"
)
type hangwriter struct {
c chan struct{}
}
func newHangWriter() *hangwriter {
return &hangwriter{make(chan struct{})}
}
func (hw *hangwriter) Write([]byte) (int, error) {
<-make(chan struct{})
return 0, fmt.Errorf("write on closed writer")
}
func (hw *hangwriter) Close() error {
close(hw.c)
return nil
}
func TestMirrorWriterHang(t *testing.T) {
mw := NewMirrorWriter()
hw := newHangWriter()
pr, pw := io.Pipe()
mw.AddWriter(hw)
mw.AddWriter(pw)
msg := "Hello!"
mw.Write([]byte(msg))
// make sure writes through can happen even with one writer hanging
done := make(chan struct{})
go func() {
buf := make([]byte, 10)
n, err := pr.Read(buf)
if err != nil {
t.Fatal(err)
}
if n != len(msg) {
t.Fatal("read wrong amount")
}
if string(buf[:n]) != msg {
t.Fatal("didnt read right content")
}
done <- struct{}{}
}()
select {
case <-time.After(time.Second * 5):
t.Fatal("write to mirrorwriter hung")
case <-done:
}
if !mw.Active() {
t.Fatal("writer should still be active")
}
pw.Close()
if !mw.Active() {
t.Fatal("writer should still be active")
}
// now we just have the hangwriter
// write a bunch to it
buf := make([]byte, 8192)
for i := 0; i < 128; i++ {
mw.Write(buf)
}
// wait for goroutines to sync up
time.Sleep(time.Millisecond * 500)
// the hangwriter should have been killed, causing the mirrorwriter to be inactive now
if mw.Active() {
t.Fatal("should be inactive now")
}
}
func TestStress(t *testing.T) {
mw := NewMirrorWriter()
nreaders := 20
var readers []io.Reader
for i := 0; i < nreaders; i++ {
pr, pw := io.Pipe()
mw.AddWriter(pw)
readers = append(readers, pr)
}
hashout := make(chan []byte)
numwriters := 20
writesize := 1024
writecount := 300
f := func(r io.Reader) {
h := fnv.New64a()
sum, err := io.Copy(h, r)
if err != nil {
t.Fatal(err)
}
if sum != int64(numwriters*writesize*writecount) {
t.Fatal("read wrong number of bytes")
}
hashout <- h.Sum(nil)
}
for _, r := range readers {
go f(r)
}
work := sync.WaitGroup{}
for i := 0; i < numwriters; i++ {
work.Add(1)
go func() {
defer work.Done()
r := randbo.New()
buf := make([]byte, writesize)
for j := 0; j < writecount; j++ {
r.Read(buf)
mw.Write(buf)
time.Sleep(time.Millisecond * 5)
}
}()
}
work.Wait()
mw.Close()
check := make(map[string]bool)
for i := 0; i < nreaders; i++ {
h := <-hashout
check[string(h)] = true
}
if len(check) > 1 {
t.Fatal("writers received different data!")
}
}
package keyspace
import (
"sort"
"math/big"
)
// Key represents an identifier in a KeySpace. It holds a reference to the
// associated KeySpace, as well references to both the Original identifier,
// as well as the new, KeySpace Bytes one.
type Key struct {
// Space is the KeySpace this Key is related to.
Space KeySpace
// Original is the original value of the identifier
Original []byte
// Bytes is the new value of the identifier, in the KeySpace.
Bytes []byte
}
// Equal returns whether this key is equal to another.
func (k1 Key) Equal(k2 Key) bool {
if k1.Space != k2.Space {
panic("k1 and k2 not in same key space.")
}
return k1.Space.Equal(k1, k2)
}
// Less returns whether this key comes before another.
func (k1 Key) Less(k2 Key) bool {
if k1.Space != k2.Space {
panic("k1 and k2 not in same key space.")
}
return k1.Space.Less(k1, k2)
}
// Distance returns this key's distance to another
func (k1 Key) Distance(k2 Key) *big.Int {
if k1.Space != k2.Space {
panic("k1 and k2 not in same key space.")
}
return k1.Space.Distance(k1, k2)
}
// KeySpace is an object used to do math on identifiers. Each keyspace has its
// own properties and rules. See XorKeySpace.
type KeySpace interface {
// Key converts an identifier into a Key in this space.
Key([]byte) Key
// Equal returns whether keys are equal in this key space
Equal(Key, Key) bool
// Distance returns the distance metric in this key space
Distance(Key, Key) *big.Int
// Less returns whether the first key is smaller than the second.
Less(Key, Key) bool
}
// byDistanceToCenter is a type used to sort Keys by proximity to a center.
type byDistanceToCenter struct {
Center Key
Keys []Key
}
func (s byDistanceToCenter) Len() int {
return len(s.Keys)
}
func (s byDistanceToCenter) Swap(i, j int) {
s.Keys[i], s.Keys[j] = s.Keys[j], s.Keys[i]
}
func (s byDistanceToCenter) Less(i, j int) bool {
a := s.Center.Distance(s.Keys[i])
b := s.Center.Distance(s.Keys[j])
return a.Cmp(b) == -1
}
// SortByDistance takes a KeySpace, a center Key, and a list of Keys toSort.
// It returns a new list, where the Keys toSort have been sorted by their
// distance to the center Key.
func SortByDistance(sp KeySpace, center Key, toSort []Key) []Key {
toSortCopy := make([]Key, len(toSort))
copy(toSortCopy, toSort)
bdtc := &byDistanceToCenter{
Center: center,
Keys: toSortCopy, // copy
}
sort.Sort(bdtc)
return bdtc.Keys
}
{
"name": "go-keyspace",
"author": "joe",
"version": "1.0.0",
"language": "go"
}
\ No newline at end of file
package keyspace
import (
"bytes"
"crypto/sha256"
"math/big"
)
// XORKeySpace is a KeySpace which:
// - normalizes identifiers using a cryptographic hash (sha256)
// - measures distance by XORing keys together
var XORKeySpace = &xorKeySpace{}
var _ KeySpace = XORKeySpace // ensure it conforms
type xorKeySpace struct{}
// Key converts an identifier into a Key in this space.
func (s *xorKeySpace) Key(id []byte) Key {
hash := sha256.Sum256(id)
key := hash[:]
return Key{
Space: s,
Original: id,
Bytes: key,
}
}
// Equal returns whether keys are equal in this key space
func (s *xorKeySpace) Equal(k1, k2 Key) bool {
return bytes.Equal(k1.Bytes, k2.Bytes)
}
// Distance returns the distance metric in this key space
func (s *xorKeySpace) Distance(k1, k2 Key) *big.Int {
// XOR the keys
k3 := XOR(k1.Bytes, k2.Bytes)
// interpret it as an integer
dist := big.NewInt(0).SetBytes(k3)
return dist
}
// Less returns whether the first key is smaller than the second.
func (s *xorKeySpace) Less(k1, k2 Key) bool {
a := k1.Bytes
b := k2.Bytes
for i := 0; i < len(a); i++ {
if a[i] != b[i] {
return a[i] < b[i]
}
}
return true
}
// ZeroPrefixLen returns the number of consecutive zeroes in a byte slice.
func ZeroPrefixLen(id []byte) int {
for i := 0; i < len(id); i++ {
for j := 0; j < 8; j++ {
if (id[i]>>uint8(7-j))&0x1 != 0 {
return i*8 + j
}
}
}
return len(id) * 8
}
// XOR takes two byte slices, XORs them together, returns the resulting slice.
func XOR(a, b []byte) []byte {
c := make([]byte, len(a))
for i := 0; i < len(a); i++ {
c[i] = a[i] ^ b[i]
}
return c
}
package keyspace
import (
"bytes"
"math/big"
"testing"
)
func TestPrefixLen(t *testing.T) {
cases := [][]byte{
{0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00},
{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
{0x00, 0x58, 0xFF, 0x80, 0x00, 0x00, 0xF0},
}
lens := []int{24, 56, 9}
for i, c := range cases {
r := ZeroPrefixLen(c)
if r != lens[i] {
t.Errorf("ZeroPrefixLen failed: %v != %v", r, lens[i])
}
}
}
func TestXorKeySpace(t *testing.T) {
ids := [][]byte{
{0xFF, 0xFF, 0xFF, 0xFF},
{0x00, 0x00, 0x00, 0x00},
{0xFF, 0xFF, 0xFF, 0xF0},
}
ks := [][2]Key{
{XORKeySpace.Key(ids[0]), XORKeySpace.Key(ids[0])},
{XORKeySpace.Key(ids[1]), XORKeySpace.Key(ids[1])},
{XORKeySpace.Key(ids[2]), XORKeySpace.Key(ids[2])},
}
for i, set := range ks {
if !set[0].Equal(set[1]) {
t.Errorf("Key not eq. %v != %v", set[0], set[1])
}
if !bytes.Equal(set[0].Bytes, set[1].Bytes) {
t.Errorf("Key gen failed. %v != %v", set[0].Bytes, set[1].Bytes)
}
if !bytes.Equal(set[0].Original, ids[i]) {
t.Errorf("ptrs to original. %v != %v", set[0].Original, ids[i])
}
if len(set[0].Bytes) != 32 {
t.Errorf("key length incorrect. 32 != %d", len(set[0].Bytes))
}
}
for i := 1; i < len(ks); i++ {
if ks[i][0].Less(ks[i-1][0]) == ks[i-1][0].Less(ks[i][0]) {
t.Errorf("less should be different.")
}
if ks[i][0].Distance(ks[i-1][0]).Cmp(ks[i-1][0].Distance(ks[i][0])) != 0 {
t.Errorf("distance should be the same.")
}
if ks[i][0].Equal(ks[i-1][0]) {
t.Errorf("Keys should not be eq. %v != %v", ks[i][0], ks[i-1][0])
}
}
}
func TestDistancesAndCenterSorting(t *testing.T) {
adjs := [][]byte{
{173, 149, 19, 27, 192, 183, 153, 192, 177, 175, 71, 127, 177, 79, 207, 38, 166, 169, 247, 96, 121, 228, 139, 240, 144, 172, 183, 232, 54, 123, 253, 14},
{223, 63, 97, 152, 4, 169, 47, 219, 64, 87, 25, 45, 196, 61, 215, 72, 234, 119, 138, 220, 82, 188, 73, 140, 232, 5, 36, 192, 20, 184, 17, 25},
{73, 176, 221, 176, 149, 143, 22, 42, 129, 124, 213, 114, 232, 95, 189, 154, 18, 3, 122, 132, 32, 199, 53, 185, 58, 157, 117, 78, 52, 146, 157, 127},
{73, 176, 221, 176, 149, 143, 22, 42, 129, 124, 213, 114, 232, 95, 189, 154, 18, 3, 122, 132, 32, 199, 53, 185, 58, 157, 117, 78, 52, 146, 157, 127},
{73, 176, 221, 176, 149, 143, 22, 42, 129, 124, 213, 114, 232, 95, 189, 154, 18, 3, 122, 132, 32, 199, 53, 185, 58, 157, 117, 78, 52, 146, 157, 126},
{73, 0, 221, 176, 149, 143, 22, 42, 129, 124, 213, 114, 232, 95, 189, 154, 18, 3, 122, 132, 32, 199, 53, 185, 58, 157, 117, 78, 52, 146, 157, 127},
}
keys := make([]Key, len(adjs))
for i, a := range adjs {
keys[i] = Key{Space: XORKeySpace, Bytes: a}
}
cmp := func(a int64, b *big.Int) int {
return big.NewInt(a).Cmp(b)
}
if 0 != cmp(0, keys[2].Distance(keys[3])) {
t.Errorf("distance calculation wrong: %v", keys[2].Distance(keys[3]))
}
if 0 != cmp(1, keys[2].Distance(keys[4])) {
t.Errorf("distance calculation wrong: %v", keys[2].Distance(keys[4]))
}
d1 := keys[2].Distance(keys[5])
d2 := XOR(keys[2].Bytes, keys[5].Bytes)
d2 = d2[len(keys[2].Bytes)-len(d1.Bytes()):] // skip empty space for big
if !bytes.Equal(d1.Bytes(), d2) {
t.Errorf("bytes should be the same. %v == %v", d1.Bytes(), d2)
}
if -1 != cmp(2<<32, keys[2].Distance(keys[5])) {
t.Errorf("2<<32 should be smaller")
}
keys2 := SortByDistance(XORKeySpace, keys[2], keys)
order := []int{2, 3, 4, 5, 1, 0}
for i, o := range order {
if !bytes.Equal(keys[o].Bytes, keys2[i].Bytes) {
t.Errorf("order is wrong. %d?? %v == %v", o, keys[o], keys2[i])
}
}
}
// Package notifier provides a simple notification dispatcher
// meant to be embedded in larger structres who wish to allow
// clients to sign up for event notifications.
package notifier
import (
"sync"
process "github.com/jbenet/goprocess"
ratelimit "github.com/jbenet/goprocess/ratelimit"
)
// Notifiee is a generic interface. Clients implement
// their own Notifiee interfaces to ensure type-safety
// of notifications:
//
// type RocketNotifiee interface{
// Countdown(r Rocket, countdown time.Duration)
// LiftedOff(Rocket)
// ReachedOrbit(Rocket)
// Detached(Rocket, Capsule)
// Landed(Rocket)
// }
//
type Notifiee interface{}
// Notifier is a notification dispatcher. It's meant
// to be composed, and its zero-value is ready to be used.
//
// type Rocket struct {
// notifier notifier.Notifier
// }
//
type Notifier struct {
mu sync.RWMutex // guards notifiees
nots map[Notifiee]struct{}
lim *ratelimit.RateLimiter
}
// RateLimited returns a rate limited Notifier. only limit goroutines
// will be spawned. If limit is zero, no rate limiting happens. This
// is the same as `Notifier{}`.
func RateLimited(limit int) Notifier {
n := Notifier{}
if limit > 0 {
n.lim = ratelimit.NewRateLimiter(process.Background(), limit)
}
return n
}
// Notify signs up Notifiee e for notifications. This function
// is meant to be called behind your own type-safe function(s):
//
// // generic function for pattern-following
// func (r *Rocket) Notify(n Notifiee) {
// r.notifier.Notify(n)
// }
//
// // or as part of other functions
// func (r *Rocket) Onboard(a Astronaut) {
// r.astronauts = append(r.austronauts, a)
// r.notifier.Notify(a)
// }
//
func (n *Notifier) Notify(e Notifiee) {
n.mu.Lock()
if n.nots == nil { // so that zero-value is ready to be used.
n.nots = make(map[Notifiee]struct{})
}
n.nots[e] = struct{}{}
n.mu.Unlock()
}
// StopNotify stops notifying Notifiee e. This function
// is meant to be called behind your own type-safe function(s):
//
// // generic function for pattern-following
// func (r *Rocket) StopNotify(n Notifiee) {
// r.notifier.StopNotify(n)
// }
//
// // or as part of other functions
// func (r *Rocket) Detach(c Capsule) {
// r.notifier.StopNotify(c)
// r.capsule = nil
// }
//
func (n *Notifier) StopNotify(e Notifiee) {
n.mu.Lock()
if n.nots != nil { // so that zero-value is ready to be used.
delete(n.nots, e)
}
n.mu.Unlock()
}
// NotifyAll messages the notifier's notifiees with a given notification.
// This is done by calling the given function with each notifiee. It is
// meant to be called with your own type-safe notification functions:
//
// func (r *Rocket) Launch() {
// r.notifyAll(func(n Notifiee) {
// n.Launched(r)
// })
// }
//
// // make it private so only you can use it. This function is necessary
// // to make sure you only up-cast in one place. You control who you added
// // to be a notifiee. If Go adds generics, maybe we can get rid of this
// // method but for now it is like wrapping a type-less container with
// // a type safe interface.
// func (r *Rocket) notifyAll(notify func(Notifiee)) {
// r.notifier.NotifyAll(func(n notifier.Notifiee) {
// notify(n.(Notifiee))
// })
// }
//
// Note well: each notification is launched in its own goroutine, so they
// can be processed concurrently, and so that whatever the notification does
// it _never_ blocks out the client. This is so that consumers _cannot_ add
// hooks into your object that block you accidentally.
func (n *Notifier) NotifyAll(notify func(Notifiee)) {
n.mu.Lock()
defer n.mu.Unlock()
if n.nots == nil { // so that zero-value is ready to be used.
return
}
// no rate limiting.
if n.lim == nil {
for notifiee := range n.nots {
go notify(notifiee)
}
return
}
// with rate limiting.
n.lim.Go(func(worker process.Process) {
for notifiee := range n.nots {
notifiee := notifiee // rebind for loop data races
n.lim.LimitedGo(func(worker process.Process) {
notify(notifiee)
})
}
})
}
package notifier
import (
"fmt"
"sync"
"testing"
"time"
)
// test data structures
type Router struct {
queue chan Packet
notifier Notifier
}
type Packet struct{}
type RouterNotifiee interface {
Enqueued(*Router, Packet)
Forwarded(*Router, Packet)
Dropped(*Router, Packet)
}
func (r *Router) Notify(n RouterNotifiee) {
r.notifier.Notify(n)
}
func (r *Router) StopNotify(n RouterNotifiee) {
r.notifier.StopNotify(n)
}
func (r *Router) notifyAll(notify func(n RouterNotifiee)) {
r.notifier.NotifyAll(func(n Notifiee) {
notify(n.(RouterNotifiee))
})
}
func (r *Router) Receive(p Packet) {
select {
case r.queue <- p: // enqueued
r.notifyAll(func(n RouterNotifiee) {
n.Enqueued(r, p)
})
default: // drop
r.notifyAll(func(n RouterNotifiee) {
n.Dropped(r, p)
})
}
}
func (r *Router) Forward() {
p := <-r.queue
r.notifyAll(func(n RouterNotifiee) {
n.Forwarded(r, p)
})
}
type Metrics struct {
enqueued int
forwarded int
dropped int
received chan struct{}
sync.Mutex
}
func (m *Metrics) Enqueued(*Router, Packet) {
m.Lock()
m.enqueued++
m.Unlock()
if m.received != nil {
m.received <- struct{}{}
}
}
func (m *Metrics) Forwarded(*Router, Packet) {
m.Lock()
m.forwarded++
m.Unlock()
if m.received != nil {
m.received <- struct{}{}
}
}
func (m *Metrics) Dropped(*Router, Packet) {
m.Lock()
m.dropped++
m.Unlock()
if m.received != nil {
m.received <- struct{}{}
}
}
func (m *Metrics) String() string {
m.Lock()
defer m.Unlock()
return fmt.Sprintf("%d enqueued, %d forwarded, %d in queue, %d dropped",
m.enqueued, m.forwarded, m.enqueued-m.forwarded, m.dropped)
}
func TestNotifies(t *testing.T) {
m := Metrics{received: make(chan struct{})}
r := Router{queue: make(chan Packet, 10)}
r.Notify(&m)
for i := 0; i < 10; i++ {
r.Receive(Packet{})
<-m.received
if m.enqueued != (1 + i) {
t.Error("not notifying correctly", m.enqueued, 1+i)
}
}
for i := 0; i < 10; i++ {
r.Receive(Packet{})
<-m.received
if m.enqueued != 10 {
t.Error("not notifying correctly", m.enqueued, 10)
}
if m.dropped != (1 + i) {
t.Error("not notifying correctly", m.dropped, 1+i)
}
}
}
func TestStopsNotifying(t *testing.T) {
m := Metrics{received: make(chan struct{})}
r := Router{queue: make(chan Packet, 10)}
r.Notify(&m)
for i := 0; i < 5; i++ {
r.Receive(Packet{})
<-m.received
if m.enqueued != (1 + i) {
t.Error("not notifying correctly")
}
}
r.StopNotify(&m)
for i := 0; i < 5; i++ {
r.Receive(Packet{})
select {
case <-m.received:
t.Error("did not stop notifying")
default:
}
if m.enqueued != 5 {
t.Error("did not stop notifying")
}
}
}
func TestThreadsafe(t *testing.T) {
N := 1000
r := Router{queue: make(chan Packet, 10)}
m1 := Metrics{received: make(chan struct{})}
m2 := Metrics{received: make(chan struct{})}
m3 := Metrics{received: make(chan struct{})}
r.Notify(&m1)
r.Notify(&m2)
r.Notify(&m3)
var n int
var wg sync.WaitGroup
for i := 0; i < N; i++ {
n++
wg.Add(1)
go func() {
defer wg.Done()
r.Receive(Packet{})
}()
if i%3 == 0 {
n++
wg.Add(1)
go func() {
defer wg.Done()
r.Forward()
}()
}
}
// drain queues
for i := 0; i < (n * 3); i++ {
select {
case <-m1.received:
case <-m2.received:
case <-m3.received:
}
}
wg.Wait()
// counts should be correct and all agree. and this should
// run fine under `go test -race -cpu=5`
t.Log("m1", m1.String())
t.Log("m2", m2.String())
t.Log("m3", m3.String())
if m1.String() != m2.String() || m2.String() != m3.String() {
t.Error("counts disagree")
}
}
type highwatermark struct {
mu sync.Mutex
mark int
limit int
errs chan error
}
func (m *highwatermark) incr() {
m.mu.Lock()
m.mark++
// fmt.Println("incr", m.mark)
if m.mark > m.limit {
m.errs <- fmt.Errorf("went over limit: %d/%d", m.mark, m.limit)
}
m.mu.Unlock()
}
func (m *highwatermark) decr() {
m.mu.Lock()
m.mark--
// fmt.Println("decr", m.mark)
if m.mark < 0 {
m.errs <- fmt.Errorf("went under zero: %d/%d", m.mark, m.limit)
}
m.mu.Unlock()
}
func TestLimited(t *testing.T) {
timeout := 10 * time.Second // huge timeout.
limit := 9
hwm := highwatermark{limit: limit, errs: make(chan error, 100)}
n := RateLimited(limit) // will stop after 3 rounds
n.Notify(1)
n.Notify(2)
n.Notify(3)
entr := make(chan struct{})
exit := make(chan struct{})
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
// fmt.Printf("round: %d\n", i)
n.NotifyAll(func(e Notifiee) {
hwm.incr()
entr <- struct{}{}
<-exit // wait
hwm.decr()
})
}
done <- struct{}{}
}()
for i := 0; i < 30; {
select {
case <-entr:
continue // let as many enter as possible
case <-time.After(1 * time.Millisecond):
}
// let one exit
select {
case <-entr:
continue // in case of timing issues.
case exit <- struct{}{}:
case <-time.After(timeout):
t.Error("got stuck")
}
i++
}
select {
case <-done: // two parts done
case <-time.After(timeout):
t.Error("did not finish")
}
close(hwm.errs)
for err := range hwm.errs {
t.Error(err)
}
}
...@@ -9,9 +9,9 @@ package loggables ...@@ -9,9 +9,9 @@ package loggables
import ( import (
"net" "net"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-multiaddr"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
peer "github.com/ipfs/go-libp2p/p2p/peer" peer "github.com/ipfs/go-libp2p/p2p/peer"
) )
......
package util
import "os"
func FileExists(filename string) bool {
fi, err := os.Lstat(filename)
if fi != nil || (err != nil && !os.IsNotExist(err)) {
return true
}
return false
}
package util
import "testing"
func TestFileDoesNotExist(t *testing.T) {
t.Parallel()
if FileExists("i would be surprised to discover that this file exists") {
t.Fail()
}
}
// Package ci implements some helper functions to use during
// tests. Many times certain facilities are not available, or tests
// must run differently.
package ci
import (
"os"
jenkins "util/testutil/ci/jenkins"
travis "util/testutil/ci/travis"
)
// EnvVar is a type to use travis-only env var names with
// the type system.
type EnvVar string
// Environment variables that TravisCI uses.
const (
VarCI EnvVar = "CI"
VarNoFuse EnvVar = "TEST_NO_FUSE"
VarVerbose EnvVar = "TEST_VERBOSE"
)
// IsRunning attempts to determine whether this process is
// running on CI. This is done by checking any of:
//
// CI=true
// travis.IsRunning()
// jenkins.IsRunning()
//
func IsRunning() bool {
if os.Getenv(string(VarCI)) == "true" {
return true
}
return travis.IsRunning() || jenkins.IsRunning()
}
// Env returns the value of a CI env variable.
func Env(v EnvVar) string {
return os.Getenv(string(v))
}
// Returns whether FUSE is explicitly disabled wiht TEST_NO_FUSE.
func NoFuse() bool {
return os.Getenv(string(VarNoFuse)) == "1"
}
// Returns whether TEST_VERBOSE is enabled.
func Verbose() bool {
return os.Getenv(string(VarVerbose)) == "1"
}
// Package jenkins implements some helper functions to use during
// tests. Many times certain facilities are not available, or tests
// must run differently.
package jenkins
import (
"os"
"strings"
)
// EnvVar is a type to use travis-only env var names with
// the type system.
type EnvVar string
// Environment variables that Jenkins uses.
const (
VarBuildNumber EnvVar = "BUILD_NUMBER"
VarBuildId EnvVar = "BUILD_ID"
VarBuildUrl EnvVar = "BUILD_URL"
VarNodeName EnvVar = "NODE_NAME"
VarJobName EnvVar = "JOB_NAME"
VarBuildTag EnvVar = "BUILD_TAG"
VarJenkinsUrl EnvVar = "JENKINS_URL"
VarExecutorNumber EnvVar = "EXECUTOR_NUMBER"
VarJavaHome EnvVar = "JAVA_HOME"
VarWorkspace EnvVar = "WORKSPACE"
VarSvnRevision EnvVar = "SVN_REVISION"
VarCvsBranch EnvVar = "CVS_BRANCH"
VarGitCommit EnvVar = "GIT_COMMIT"
VarGitUrl EnvVar = "GIT_URL"
VarGitBranch EnvVar = "GIT_BRANCH"
)
// IsRunning attempts to determine whether this process is
// running on Jenkins CI. This is done by checking any of the
// following:
//
// JENKINS_URL is set
// BuildTag has prefix "jenkins-"
//
func IsRunning() bool {
return len(Env(VarJenkinsUrl)) > 0 || strings.HasPrefix(Env(VarBuildTag), "jenkins-")
}
// Env returns the value of a travis env variable.
func Env(v EnvVar) string {
return os.Getenv(string(v))
}
// JobName returns the jenkins JOB_NAME of this build.
func JobName() string {
return Env(VarJobName)
}
// BuildTag returns the jenkins BUILD_TAG.
func BuildTag() string {
return Env(VarBuildTag)
}
package jenkins
import (
"os"
"strings"
"testing"
)
func TestIsRunning(t *testing.T) {
hasPrefix := strings.HasPrefix(os.Getenv("BUILD_TAG"), "jenkins-")
tr := len(os.Getenv("JENKINS_URL")) > 0 || hasPrefix
if tr != IsRunning() {
t.Error("IsRunning() does not match TRAVIS && CI env var check")
}
}
// Package travis implements some helper functions to use during
// tests. Many times certain facilities are not available, or tests
// must run differently.
package travis
import "os"
// EnvVar is a type to use travis-only env var names with
// the type system.
type EnvVar string
// Environment variables that TravisCI uses.
const (
VarCI EnvVar = "CI"
VarTravis EnvVar = "TRAVIS"
VarBranch EnvVar = "TRAVIS_BRANCH"
VarBuildDir EnvVar = "TRAVIS_BUILD_DIR"
VarBuildId EnvVar = "TRAVIS_BUILD_ID"
VarBuildNumber EnvVar = "TRAVIS_BUILD_NUMBER"
VarCommit EnvVar = "TRAVIS_COMMIT"
VarCommitRange EnvVar = "TRAVIS_COMMIT_RANGE"
VarJobId EnvVar = "TRAVIS_JOB_ID"
VarJobNumber EnvVar = "TRAVIS_JOB_NUMBER"
VarPullRequest EnvVar = "TRAVIS_PULL_REQUEST"
VarSecureEnvVars EnvVar = "TRAVIS_SECURE_ENV_VARS"
VarRepoSlug EnvVar = "TRAVIS_REPO_SLUG"
VarOsName EnvVar = "TRAVIS_OS_NAME"
VarTag EnvVar = "TRAVIS_TAG"
VarGoVersion EnvVar = "TRAVIS_GO_VERSION"
)
// IsRunning attempts to determine whether this process is
// running on Travis-CI. This is done by checking ALL of the
// following env vars are set:
//
// CI=true
// TRAVIS=true
//
// Note: cannot just check CI.
func IsRunning() bool {
return Env(VarCI) == "true" && Env(VarTravis) == "true"
}
// Env returns the value of a travis env variable.
func Env(v EnvVar) string {
return os.Getenv(string(v))
}
// JobId returns the travis JOB_ID of this build.
func JobId() string {
return Env(VarJobId)
}
// JobNumber returns the travis JOB_NUMBER of this build.
func JobNumber() string {
return Env(VarJobNumber)
}
package travis
import (
"os"
"testing"
)
func TestIsRunning(t *testing.T) {
tr := os.Getenv("TRAVIS") == "true" && os.Getenv("CI") == "true"
if tr != IsRunning() {
t.Error("IsRunning() does not match TRAVIS && CI env var check")
}
}
package testutil
import (
"bytes"
"errors"
"fmt"
"io"
"sync"
"testing"
ci "github.com/ipfs/go-libp2p/p2p/crypto"
peer "github.com/ipfs/go-libp2p/p2p/peer"
u "util"
ma "github.com/jbenet/go-multiaddr"
)
// ZeroLocalTCPAddress is the "zero" tcp local multiaddr. This means:
// /ip4/127.0.0.1/tcp/0
var ZeroLocalTCPAddress ma.Multiaddr
func init() {
// initialize ZeroLocalTCPAddress
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if err != nil {
panic(err)
}
ZeroLocalTCPAddress = maddr
}
func RandTestKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) {
return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand())
}
func SeededTestKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) {
return ci.GenerateKeyPairWithReader(ci.RSA, 512, u.NewSeededRand(seed))
}
// RandPeerID generates random "valid" peer IDs. it does not NEED to generate
// keys because it is as if we lost the key right away. fine to read randomness
// and hash it. to generate proper keys and corresponding PeerID, use:
// sk, pk, _ := testutil.RandKeyPair()
// id, _ := peer.IDFromPublicKey(pk)
func RandPeerID() (peer.ID, error) {
buf := make([]byte, 16)
if _, err := io.ReadFull(u.NewTimeSeededRand(), buf); err != nil {
return "", err
}
h := u.Hash(buf)
return peer.ID(h), nil
}
func RandPeerIDFatal(t testing.TB) peer.ID {
p, err := RandPeerID()
if err != nil {
t.Fatal(err)
}
return p
}
// RandLocalTCPAddress returns a random multiaddr. it suppresses errors
// for nice composability-- do check the address isn't nil.
//
// Note: for real network tests, use ZeroLocalTCPAddress so the kernel
// assigns an unused TCP port. otherwise you may get clashes. This
// function remains here so that p2p/net/mock (which does not touch the
// real network) can assign different addresses to peers.
func RandLocalTCPAddress() ma.Multiaddr {
// chances are it will work out, but it **might** fail if the port is in use
// most ports above 10000 aren't in use by long running processes, so yay.
// (maybe there should be a range of "loopback" ports that are guaranteed
// to be open for the process, but naturally can only talk to self.)
lastPort.Lock()
if lastPort.port == 0 {
lastPort.port = 10000 + SeededRand.Intn(50000)
}
port := lastPort.port
lastPort.port++
lastPort.Unlock()
addr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)
maddr, _ := ma.NewMultiaddr(addr)
return maddr
}
var lastPort = struct {
port int
sync.Mutex
}{}
// PeerNetParams is a struct to bundle together the four things
// you need to run a connection with a peer: id, 2keys, and addr.
type PeerNetParams struct {
ID peer.ID
PrivKey ci.PrivKey
PubKey ci.PubKey
Addr ma.Multiaddr
}
func (p *PeerNetParams) checkKeys() error {
if !p.ID.MatchesPrivateKey(p.PrivKey) {
return errors.New("p.ID does not match p.PrivKey")
}
if !p.ID.MatchesPublicKey(p.PubKey) {
return errors.New("p.ID does not match p.PubKey")
}
buf := new(bytes.Buffer)
buf.Write([]byte("hello world. this is me, I swear."))
b := buf.Bytes()
sig, err := p.PrivKey.Sign(b)
if err != nil {
return fmt.Errorf("sig signing failed: %s", err)
}
sigok, err := p.PubKey.Verify(b, sig)
if err != nil {
return fmt.Errorf("sig verify failed: %s", err)
}
if !sigok {
return fmt.Errorf("sig verify failed: sig invalid")
}
return nil // ok. move along.
}
func RandPeerNetParamsOrFatal(t *testing.T) PeerNetParams {
p, err := RandPeerNetParams()
if err != nil {
t.Fatal(err)
return PeerNetParams{} // TODO return nil
}
return *p
}
func RandPeerNetParams() (*PeerNetParams, error) {
var p PeerNetParams
var err error
p.Addr = ZeroLocalTCPAddress
p.PrivKey, p.PubKey, err = RandTestKeyPair(512)
if err != nil {
return nil, err
}
p.ID, err = peer.IDFromPublicKey(p.PubKey)
if err != nil {
return nil, err
}
if err := p.checkKeys(); err != nil {
return nil, err
}
return &p, nil
}
package testutil
import (
"testing"
ci "github.com/ipfs/go-libp2p/p2p/crypto"
peer "github.com/ipfs/go-libp2p/p2p/peer"
ma "github.com/jbenet/go-multiaddr"
)
type Identity interface {
Address() ma.Multiaddr
ID() peer.ID
PrivateKey() ci.PrivKey
PublicKey() ci.PubKey
}
// TODO add a cheaper way to generate identities
func RandIdentity() (Identity, error) {
p, err := RandPeerNetParams()
if err != nil {
return nil, err
}
return &identity{*p}, nil
}
func RandIdentityOrFatal(t *testing.T) Identity {
p, err := RandPeerNetParams()
if err != nil {
t.Fatal(err)
}
return &identity{*p}
}
// identity is a temporary shim to delay binding of PeerNetParams.
type identity struct {
PeerNetParams
}
func (p *identity) ID() peer.ID {
return p.PeerNetParams.ID
}
func (p *identity) Address() ma.Multiaddr {
return p.Addr
}
func (p *identity) PrivateKey() ci.PrivKey {
return p.PrivKey
}
func (p *identity) PublicKey() ci.PubKey {
return p.PubKey
}
package testutil
import "time"
type LatencyConfig struct {
BlockstoreLatency time.Duration
NetworkLatency time.Duration
RoutingLatency time.Duration
}
func (c LatencyConfig) AllInstantaneous() LatencyConfig {
// Could use a zero value but whatever. Consistency of interface
c.NetworkLatency = 0
c.RoutingLatency = 0
c.BlockstoreLatency = 0
return c
}
func (c LatencyConfig) NetworkNYtoSF() LatencyConfig {
c.NetworkLatency = 20 * time.Millisecond
return c
}
func (c LatencyConfig) NetworkIntraDatacenter2014() LatencyConfig {
c.NetworkLatency = 250 * time.Microsecond
return c
}
func (c LatencyConfig) BlockstoreFastSSD2014() LatencyConfig {
const iops = 100000
c.BlockstoreLatency = (1 / iops) * time.Second
return c
}
func (c LatencyConfig) BlockstoreSlowSSD2014() LatencyConfig {
c.BlockstoreLatency = 150 * time.Microsecond
return c
}
func (c LatencyConfig) Blockstore7200RPM() LatencyConfig {
c.BlockstoreLatency = 8 * time.Millisecond
return c
}
func (c LatencyConfig) RoutingSlow() LatencyConfig {
c.RoutingLatency = 200 * time.Millisecond
return c
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment