Commit 0c73722a authored by Jeromy's avatar Jeromy
Browse files

vendor in notifier

parent d4b42f8e
......@@ -4,11 +4,11 @@ import (
"strings"
"sync"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
ma "QmbWxL1aXQhBjc1XGjGF1f2KGBMCBYSuT2ThA8YXnXJK83/go-multiaddr"
ggio "QmfH4HuZyN1p2wQLWWkXC91Z76435xKrBVfLQ2MY8ayG5R/gogo-protobuf/io"
semver "github.com/coreos/go-semver/semver"
msmux "github.com/whyrusleeping/go-multistream"
context "golang.org/x/net/context"
host "github.com/ipfs/go-libp2p/p2p/host"
mstream "github.com/ipfs/go-libp2p/p2p/metrics/stream"
......
......@@ -9,8 +9,8 @@ import (
identify "github.com/ipfs/go-libp2p/p2p/protocol/identify"
testutil "github.com/ipfs/go-libp2p/p2p/test/util"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
ma "QmbWxL1aXQhBjc1XGjGF1f2KGBMCBYSuT2ThA8YXnXJK83/go-multiaddr"
context "golang.org/x/net/context"
)
func subtestIDService(t *testing.T, postDialWait time.Duration) {
......
......@@ -6,13 +6,13 @@ import (
"io"
"time"
context "golang.org/x/net/context"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
host "github.com/ipfs/go-libp2p/p2p/host"
inet "github.com/ipfs/go-libp2p/p2p/net"
peer "github.com/ipfs/go-libp2p/p2p/peer"
u "util"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
)
var log = logging.Logger("ping")
......
......@@ -4,7 +4,7 @@ import (
"testing"
"time"
context "golang.org/x/net/context"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
peer "github.com/ipfs/go-libp2p/p2p/peer"
netutil "github.com/ipfs/go-libp2p/p2p/test/util"
)
......
......@@ -4,14 +4,14 @@ import (
"io"
"testing"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
inet "github.com/ipfs/go-libp2p/p2p/net"
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
relay "github.com/ipfs/go-libp2p/p2p/protocol/relay"
testutil "github.com/ipfs/go-libp2p/p2p/test/util"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
msmux "github.com/whyrusleeping/go-multistream"
context "golang.org/x/net/context"
)
var log = logging.Logger("relay_test")
......
......@@ -6,14 +6,14 @@ import (
"testing"
"time"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
host "github.com/ipfs/go-libp2p/p2p/host"
inet "github.com/ipfs/go-libp2p/p2p/net"
peer "github.com/ipfs/go-libp2p/p2p/peer"
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
testutil "github.com/ipfs/go-libp2p/p2p/test/util"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
context "golang.org/x/net/context"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
u "util"
)
......
......@@ -7,16 +7,16 @@ import (
"testing"
"time"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
host "github.com/ipfs/go-libp2p/p2p/host"
inet "github.com/ipfs/go-libp2p/p2p/net"
swarm "github.com/ipfs/go-libp2p/p2p/net/swarm"
protocol "github.com/ipfs/go-libp2p/p2p/protocol"
testutil "github.com/ipfs/go-libp2p/p2p/test/util"
u "util"
logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
ps "github.com/jbenet/go-peerstream"
context "golang.org/x/net/context"
)
func init() {
......
......@@ -10,8 +10,8 @@ import (
peer "github.com/ipfs/go-libp2p/p2p/peer"
tu "util/testutil"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
ma "QmbWxL1aXQhBjc1XGjGF1f2KGBMCBYSuT2ThA8YXnXJK83/go-multiaddr"
context "golang.org/x/net/context"
)
func GenSwarmNetwork(t *testing.T, ctx context.Context) *swarm.Network {
......
......@@ -27,6 +27,11 @@
"name": "gogo-protobuf",
"hash": "QmfH4HuZyN1p2wQLWWkXC91Z76435xKrBVfLQ2MY8ayG5R",
"version": "1.0.0"
},
{
"name": "go-notifier",
"hash": "QmUtEiB6DmXs7eLJiwS9YFyTAtptqzaWutxCsjHy7UKEgo",
"version": "1.0.0"
}
],
"language": "go"
......
sudo: false
language: go
go:
- 1.3
- 1.4
script:
- go test -race -cpu=5 -v ./...
The MIT License (MIT)
Copyright (c) 2014 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# goprocess - lifecycles in go
[![travisbadge](https://travis-ci.org/jbenet/goprocess.svg)](https://travis-ci.org/jbenet/goprocess)
(Based on https://github.com/jbenet/go-ctxgroup)
- Godoc: https://godoc.org/github.com/jbenet/goprocess
`goprocess` introduces a way to manage process lifecycles in go. It is
much like [go.net/context](https://godoc.org/code.google.com/p/go.net/context)
(it actually uses a Context), but it is more like a Context-WaitGroup hybrid.
`goprocess` is about being able to start and stop units of work, which may
receive `Close` signals from many clients. Think of it like a UNIX process
tree, but inside go.
`goprocess` seeks to minimally affect your objects, so you can use it
with both embedding or composition. At the heart of `goprocess` is the
`Process` interface:
```Go
// Process is the basic unit of work in goprocess. It defines a computation
// with a lifecycle:
// - running (before calling Close),
// - closing (after calling Close at least once),
// - closed (after Close returns, and all teardown has _completed_).
//
// More specifically, it fits this:
//
// p := WithTeardown(tf) // new process is created, it is now running.
// p.AddChild(q) // can register children **before** Closing.
// go p.Close() // blocks until done running teardown func.
// <-p.Closing() // would now return true.
// <-p.childrenDone() // wait on all children to be done
// p.teardown() // runs the user's teardown function tf.
// p.Close() // now returns, with error teardown returned.
// <-p.Closed() // would now return true.
//
// Processes can be arranged in a process "tree", where children are
// automatically Closed if their parents are closed. (Note, it is actually
// a Process DAG, children may have multiple parents). A process may also
// optionally wait for another to fully Close before beginning to Close.
// This makes it easy to ensure order of operations and proper sequential
// teardown of resurces. For example:
//
// p1 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 1")
// })
// p2 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 2")
// })
// p3 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 3")
// })
//
// p1.AddChild(p2)
// p2.AddChild(p3)
//
//
// go p1.Close()
// go p2.Close()
// go p3.Close()
//
// // Output:
// // closing 3
// // closing 2
// // closing 1
//
// Process is modelled after the UNIX processes group idea, and heavily
// informed by sync.WaitGroup and go.net/context.Context.
//
// In the function documentation of this interface, `p` always refers to
// the self Process.
type Process interface {
// WaitFor makes p wait for q before exiting. Thus, p will _always_ close
// _after_ q. Note well: a waiting cycle is deadlock.
//
// If q is already Closed, WaitFor calls p.Close()
// If p is already Closing or Closed, WaitFor panics. This is the same thing
// as calling Add(1) _after_ calling Done() on a wait group. Calling WaitFor
// on an already-closed process is a programming error likely due to bad
// synchronization
WaitFor(q Process)
// AddChildNoWait registers child as a "child" of Process. As in UNIX,
// when parent is Closed, child is Closed -- child may Close beforehand.
// This is the equivalent of calling:
//
// go func(parent, child Process) {
// <-parent.Closing()
// child.Close()
// }(p, q)
//
// Note: the naming of functions is `AddChildNoWait` and `AddChild` (instead
// of `AddChild` and `AddChildWaitFor`) because:
// - it is the more common operation,
// - explicitness is helpful in the less common case (no waiting), and
// - usual "child" semantics imply parent Processes should wait for children.
AddChildNoWait(q Process)
// AddChild is the equivalent of calling:
// parent.AddChildNoWait(q)
// parent.WaitFor(q)
AddChild(q Process)
// Go creates a new process, adds it as a child, and spawns the ProcessFunc f
// in its own goroutine. It is equivalent to:
//
// GoChild(p, f)
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc) Process
// Close ends the process. Close blocks until the process has completely
// shut down, and any teardown has run _exactly once_. The returned error
// is available indefinitely: calling Close twice returns the same error.
// If the process has already been closed, Close returns immediately.
Close() error
// Closing is a signal to wait upon. The returned channel is closed
// _after_ Close has been called at least once, but teardown may or may
// not be done yet. The primary use case of Closing is for children who
// need to know when a parent is shutting down, and therefore also shut
// down.
Closing() <-chan struct{}
// Closed is a signal to wait upon. The returned channel is closed
// _after_ Close has completed; teardown has finished. The primary use case
// of Closed is waiting for a Process to Close without _causing_ the Close.
Closed() <-chan struct{}
}
```
package goprocessctx
import (
goprocess "QmSir6qPL1tjuxd8LkR8VZq6v625ExAUVs2eCLeqQuaPGU/goprocess"
context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
)
// WithContext constructs and returns a Process that respects
// given context. It is the equivalent of:
//
// func ProcessWithContext(ctx context.Context) goprocess.Process {
// p := goprocess.WithParent(goprocess.Background())
// CloseAfterContext(p, ctx)
// return p
// }
//
func WithContext(ctx context.Context) goprocess.Process {
p := goprocess.WithParent(goprocess.Background())
CloseAfterContext(p, ctx)
return p
}
// WithContextAndTeardown is a helper function to set teardown at initiation
// of WithContext
func WithContextAndTeardown(ctx context.Context, tf goprocess.TeardownFunc) goprocess.Process {
p := goprocess.WithTeardown(tf)
CloseAfterContext(p, ctx)
return p
}
// WaitForContext makes p WaitFor ctx. When Closing, p waits for
// ctx.Done(), before being Closed(). It is simply:
//
// p.WaitFor(goprocess.WithContext(ctx))
//
func WaitForContext(ctx context.Context, p goprocess.Process) {
p.WaitFor(WithContext(ctx))
}
// CloseAfterContext schedules the process to close after the given
// context is done. It is the equivalent of:
//
// func CloseAfterContext(p goprocess.Process, ctx context.Context) {
// go func() {
// <-ctx.Done()
// p.Close()
// }()
// }
//
func CloseAfterContext(p goprocess.Process, ctx context.Context) {
if p == nil {
panic("nil Process")
}
if ctx == nil {
panic("nil Context")
}
// context.Background(). if ctx.Done() is nil, it will never be done.
// we check for this to avoid wasting a goroutine forever.
if ctx.Done() == nil {
return
}
go func() {
<-ctx.Done()
p.Close()
}()
}
// WithProcessClosing returns a context.Context derived from ctx that
// is cancelled as p is Closing (after: <-p.Closing()). It is simply:
//
// func WithProcessClosing(ctx context.Context, p goprocess.Process) context.Context {
// ctx, cancel := context.WithCancel(ctx)
// go func() {
// <-p.Closing()
// cancel()
// }()
// return ctx
// }
//
func WithProcessClosing(ctx context.Context, p goprocess.Process) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
<-p.Closing()
cancel()
}()
return ctx
}
// WithProcessClosed returns a context.Context that is cancelled
// after Process p is Closed. It is the equivalent of:
//
// func WithProcessClosed(ctx context.Context, p goprocess.Process) context.Context {
// ctx, cancel := context.WithCancel(ctx)
// go func() {
// <-p.Closed()
// cancel()
// }()
// return ctx
// }
//
func WithProcessClosed(ctx context.Context, p goprocess.Process) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
<-p.Closed()
cancel()
}()
return ctx
}
package goprocessctx
import (
"errors"
"time"
goprocess "QmSir6qPL1tjuxd8LkR8VZq6v625ExAUVs2eCLeqQuaPGU/goprocess"
"QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
)
const (
closing = iota
closed
)
type procContext struct {
done <-chan struct{}
which int
}
// OnClosingContext derives a context from a given goprocess that will
// be 'Done' when the process is closing
func OnClosingContext(p goprocess.Process) context.Context {
return &procContext{
done: p.Closing(),
which: closing,
}
}
// OnClosedContext derives a context from a given goprocess that will
// be 'Done' when the process is closed
func OnClosedContext(p goprocess.Process) context.Context {
return &procContext{
done: p.Closed(),
which: closed,
}
}
func (c *procContext) Done() <-chan struct{} {
return c.done
}
func (c *procContext) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (c *procContext) Err() error {
if c.which == closing {
return errors.New("process closing")
} else if c.which == closed {
return errors.New("process closed")
} else {
panic("unrecognized process context type")
}
}
func (c *procContext) Value(key interface{}) interface{} {
return nil
}
package goprocess_test
import (
"fmt"
"time"
"QmSir6qPL1tjuxd8LkR8VZq6v625ExAUVs2eCLeqQuaPGU/goprocess"
)
func ExampleGo() {
p := goprocess.Go(func(p goprocess.Process) {
ticker := time.Tick(200 * time.Millisecond)
for {
select {
case <-ticker:
fmt.Println("tick")
case <-p.Closing():
fmt.Println("closing")
return
}
}
})
<-time.After(1100 * time.Millisecond)
p.Close()
fmt.Println("closed")
<-time.After(100 * time.Millisecond)
// Output:
// tick
// tick
// tick
// tick
// tick
// closing
// closed
}
// Package goprocess introduces a Process abstraction that allows simple
// organization, and orchestration of work. It is much like a WaitGroup,
// and much like a context.Context, but also ensures safe **exactly-once**,
// and well-ordered teardown semantics.
package goprocess
import (
"os"
"os/signal"
)
// Process is the basic unit of work in goprocess. It defines a computation
// with a lifecycle:
// - running (before calling Close),
// - closing (after calling Close at least once),
// - closed (after Close returns, and all teardown has _completed_).
//
// More specifically, it fits this:
//
// p := WithTeardown(tf) // new process is created, it is now running.
// p.AddChild(q) // can register children **before** Closed().
// go p.Close() // blocks until done running teardown func.
// <-p.Closing() // would now return true.
// <-p.childrenDone() // wait on all children to be done
// p.teardown() // runs the user's teardown function tf.
// p.Close() // now returns, with error teardown returned.
// <-p.Closed() // would now return true.
//
// Processes can be arranged in a process "tree", where children are
// automatically Closed if their parents are closed. (Note, it is actually
// a Process DAG, children may have multiple parents). A process may also
// optionally wait for another to fully Close before beginning to Close.
// This makes it easy to ensure order of operations and proper sequential
// teardown of resurces. For example:
//
// p1 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 1")
// })
// p2 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 2")
// })
// p3 := goprocess.WithTeardown(func() error {
// fmt.Println("closing 3")
// })
//
// p1.AddChild(p2)
// p2.AddChild(p3)
//
//
// go p1.Close()
// go p2.Close()
// go p3.Close()
//
// // Output:
// // closing 3
// // closing 2
// // closing 1
//
// Process is modelled after the UNIX processes group idea, and heavily
// informed by sync.WaitGroup and go.net/context.Context.
//
// In the function documentation of this interface, `p` always refers to
// the self Process.
type Process interface {
// WaitFor makes p wait for q before exiting. Thus, p will _always_ close
// _after_ q. Note well: a waiting cycle is deadlock.
//
// If q is already Closed, WaitFor calls p.Close()
// If p is already Closing or Closed, WaitFor panics. This is the same thing
// as calling Add(1) _after_ calling Done() on a wait group. Calling WaitFor
// on an already-closed process is a programming error likely due to bad
// synchronization
WaitFor(q Process)
// AddChildNoWait registers child as a "child" of Process. As in UNIX,
// when parent is Closed, child is Closed -- child may Close beforehand.
// This is the equivalent of calling:
//
// go func(parent, child Process) {
// <-parent.Closing()
// child.Close()
// }(p, q)
//
// Note: the naming of functions is `AddChildNoWait` and `AddChild` (instead
// of `AddChild` and `AddChildWaitFor`) because:
// - it is the more common operation,
// - explicitness is helpful in the less common case (no waiting), and
// - usual "child" semantics imply parent Processes should wait for children.
AddChildNoWait(q Process)
// AddChild is the equivalent of calling:
// parent.AddChildNoWait(q)
// parent.WaitFor(q)
AddChild(q Process)
// Go is much like `go`, as it runs a function in a newly spawned goroutine.
// The neat part of Process.Go is that the Process object you call it on will:
// * construct a child Process, and call AddChild(child) on it
// * spawn a goroutine, and call the given function
// * Close the child when the function exits.
// This way, you can rest assured each goroutine you spawn has its very own
// Process context, and that it will be closed when the function exits.
// It is the function's responsibility to respect the Closing of its Process,
// namely it should exit (return) when <-Closing() is ready. It is basically:
//
// func (p Process) Go(f ProcessFunc) Process {
// child := WithParent(p)
// go func () {
// f(child)
// child.Close()
// }()
// }
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc) Process
// SetTeardown sets the process's teardown to tf.
SetTeardown(tf TeardownFunc)
// Close ends the process. Close blocks until the process has completely
// shut down, and any teardown has run _exactly once_. The returned error
// is available indefinitely: calling Close twice returns the same error.
// If the process has already been closed, Close returns immediately.
Close() error
// CloseAfterChildren calls Close _after_ its children have Closed
// normally (i.e. it _does not_ attempt to close them).
CloseAfterChildren() error
// Closing is a signal to wait upon. The returned channel is closed
// _after_ Close has been called at least once, but teardown may or may
// not be done yet. The primary use case of Closing is for children who
// need to know when a parent is shutting down, and therefore also shut
// down.
Closing() <-chan struct{}
// Closed is a signal to wait upon. The returned channel is closed
// _after_ Close has completed; teardown has finished. The primary use case
// of Closed is waiting for a Process to Close without _causing_ the Close.
Closed() <-chan struct{}
// Err waits until the process is closed, and then returns any error that
// occurred during shutdown.
Err() error
}
// TeardownFunc is a function used to cleanup state at the end of the
// lifecycle of a Process.
type TeardownFunc func() error
// ProcessFunc is a function that takes a process. Its main use case is goprocess.Go,
// which spawns a ProcessFunc in its own goroutine, and returns a corresponding
// Process object.
type ProcessFunc func(proc Process)
var nilProcessFunc = func(Process) {}
// Go is much like `go`: it runs a function in a newly spawned goroutine. The neat
// part of Go is that it provides Process object to communicate between the
// function and the outside world. Thus, callers can easily WaitFor, or Close the
// function. It is the function's responsibility to respect the Closing of its Process,
// namely it should exit (return) when <-Closing() is ready. It is simply:
//
// func Go(f ProcessFunc) Process {
// p := WithParent(Background())
// p.Go(f)
// return p
// }
//
// Note that a naive implementation of Go like the following would not work:
//
// func Go(f ProcessFunc) Process {
// return Background().Go(f)
// }
//
// This is because having the process you
func Go(f ProcessFunc) Process {
// return GoChild(Background(), f)
// we use two processes, one for communication, and
// one for ensuring we wait on the function (unclosable from the outside).
p := newProcess(nil)
waitFor := newProcess(nil)
p.WaitFor(waitFor) // prevent p from closing
go func() {
f(p)
waitFor.Close() // allow p to close.
p.Close() // ensure p closes.
}()
return p
}
// GoChild is like Go, but it registers the returned Process as a child of parent,
// **before** spawning the goroutine, which ensures proper synchronization with parent.
// It is somewhat like
//
// func GoChild(parent Process, f ProcessFunc) Process {
// p := WithParent(parent)
// p.Go(f)
// return p
// }
//
// And it is similar to the classic WaitGroup use case:
//
// func WaitGroupGo(wg sync.WaitGroup, child func()) {
// wg.Add(1)
// go func() {
// child()
// wg.Done()
// }()
// }
//
func GoChild(parent Process, f ProcessFunc) Process {
p := WithParent(parent)
p.Go(f)
return p
}
// Spawn is an alias of `Go`. In many contexts, Spawn is a
// well-known Process launching word, which fits our use case.
var Spawn = Go
// SpawnChild is an alias of `GoChild`. In many contexts, Spawn is a
// well-known Process launching word, which fits our use case.
var SpawnChild = GoChild
// WithTeardown constructs and returns a Process with a TeardownFunc.
// TeardownFunc tf will be called **exactly-once** when Process is
// Closing, after all Children have fully closed, and before p is Closed.
// In fact, Process p will not be Closed until tf runs and exits.
// See lifecycle in Process doc.
func WithTeardown(tf TeardownFunc) Process {
if tf == nil {
panic("nil tf TeardownFunc")
}
return newProcess(tf)
}
// WithParent constructs and returns a Process with a given parent.
func WithParent(parent Process) Process {
if parent == nil {
panic("nil parent Process")
}
q := newProcess(nil)
parent.AddChild(q)
return q
}
// WithSignals returns a Process that will Close() when any given signal fires.
// This is useful to bind Process trees to syscall.SIGTERM, SIGKILL, etc.
func WithSignals(sig ...os.Signal) Process {
p := WithParent(Background())
c := make(chan os.Signal)
signal.Notify(c, sig...)
go func() {
<-c
signal.Stop(c)
p.Close()
}()
return p
}
// Background returns the "background" Process: a statically allocated
// process that can _never_ close. It also never enters Closing() state.
// Calling Background().Close() will hang indefinitely.
func Background() Process {
return background
}
// background is the background process
var background = &unclosable{Process: newProcess(nil)}
// unclosable is a process that _cannot_ be closed. calling Close simply hangs.
type unclosable struct {
Process
}
func (p *unclosable) Close() error {
var hang chan struct{}
<-hang // hang forever
return nil
}
package goprocess
import (
"fmt"
"runtime"
"syscall"
"testing"
"time"
)
type tree struct {
Process
c []tree
}
func setupHierarchy(p Process) tree {
t := func(n Process, ts ...tree) tree {
return tree{n, ts}
}
a := WithParent(p)
b1 := WithParent(a)
b2 := WithParent(a)
c1 := WithParent(b1)
c2 := WithParent(b1)
c3 := WithParent(b2)
c4 := WithParent(b2)
return t(a, t(b1, t(c1), t(c2)), t(b2, t(c3), t(c4)))
}
func TestClosingClosed(t *testing.T) {
bWait := make(chan struct{})
a := WithParent(Background())
a.Go(func(proc Process) {
<-bWait
})
Q := make(chan string, 3)
go func() {
<-a.Closing()
Q <- "closing"
bWait <- struct{}{}
}()
go func() {
<-a.Closed()
Q <- "closed"
}()
go func() {
a.Close()
Q <- "closed"
}()
if q := <-Q; q != "closing" {
t.Error("order incorrect. closing not first")
}
if q := <-Q; q != "closed" {
t.Error("order incorrect. closing not first")
}
if q := <-Q; q != "closed" {
t.Error("order incorrect. closing not first")
}
}
func TestChildFunc(t *testing.T) {
a := WithParent(Background())
wait1 := make(chan struct{})
wait2 := make(chan struct{})
wait3 := make(chan struct{})
wait4 := make(chan struct{})
a.Go(func(process Process) {
wait1 <- struct{}{}
<-wait2
wait3 <- struct{}{}
})
go func() {
a.Close()
wait4 <- struct{}{}
}()
<-wait1
select {
case <-wait3:
t.Error("should not be closed yet")
case <-wait4:
t.Error("should not be closed yet")
case <-a.Closed():
t.Error("should not be closed yet")
default:
}
wait2 <- struct{}{}
select {
case <-wait3:
case <-time.After(time.Second):
t.Error("should be closed now")
}
select {
case <-wait4:
case <-time.After(time.Second):
t.Error("should be closed now")
}
}
func TestTeardownCalledOnce(t *testing.T) {
a := setupHierarchy(Background())
onlyOnce := func() func() error {
count := 0
return func() error {
count++
if count > 1 {
t.Error("called", count, "times")
}
return nil
}
}
a.SetTeardown(onlyOnce())
a.c[0].SetTeardown(onlyOnce())
a.c[0].c[0].SetTeardown(onlyOnce())
a.c[0].c[1].SetTeardown(onlyOnce())
a.c[1].SetTeardown(onlyOnce())
a.c[1].c[0].SetTeardown(onlyOnce())
a.c[1].c[1].SetTeardown(onlyOnce())
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.c[0].Close()
a.Close()
a.Close()
a.Close()
a.Close()
a.c[1].Close()
a.c[1].Close()
a.c[1].Close()
a.c[1].Close()
}
func TestOnClosedAll(t *testing.T) {
Q := make(chan string, 10)
p := WithParent(Background())
a := setupHierarchy(p)
go onClosedStr(Q, "0", a.c[0])
go onClosedStr(Q, "10", a.c[1].c[0])
go onClosedStr(Q, "", a)
go onClosedStr(Q, "00", a.c[0].c[0])
go onClosedStr(Q, "1", a.c[1])
go onClosedStr(Q, "01", a.c[0].c[1])
go onClosedStr(Q, "11", a.c[1].c[1])
go p.Close()
testStrs(t, Q, "00", "01", "10", "11", "0", "1", "")
testStrs(t, Q, "00", "01", "10", "11", "0", "1", "")
testStrs(t, Q, "00", "01", "10", "11", "0", "1", "")
testStrs(t, Q, "00", "01", "10", "11", "0", "1", "")
testStrs(t, Q, "00", "01", "10", "11", "0", "1", "")
testStrs(t, Q, "00", "01", "10", "11", "0", "1", "")
}
func TestOnClosedLeaves(t *testing.T) {
Q := make(chan string, 10)
p := WithParent(Background())
a := setupHierarchy(p)
go onClosedStr(Q, "0", a.c[0])
go onClosedStr(Q, "10", a.c[1].c[0])
go onClosedStr(Q, "", a)
go onClosedStr(Q, "00", a.c[0].c[0])
go onClosedStr(Q, "1", a.c[1])
go onClosedStr(Q, "01", a.c[0].c[1])
go onClosedStr(Q, "11", a.c[1].c[1])
go a.c[0].Close()
testStrs(t, Q, "00", "01", "0")
testStrs(t, Q, "00", "01", "0")
testStrs(t, Q, "00", "01", "0")
go a.c[1].Close()
testStrs(t, Q, "10", "11", "1")
testStrs(t, Q, "10", "11", "1")
testStrs(t, Q, "10", "11", "1")
go p.Close()
testStrs(t, Q, "")
}
func TestWaitFor(t *testing.T) {
Q := make(chan string, 5)
a := WithParent(Background())
b := WithParent(Background())
c := WithParent(Background())
d := WithParent(Background())
e := WithParent(Background())
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
a.WaitFor(b)
a.WaitFor(c)
b.WaitFor(d)
e.WaitFor(d)
testNone(t, Q)
go a.Close() // should do nothing.
testNone(t, Q)
go e.Close()
testNone(t, Q)
d.Close()
testStrs(t, Q, "d", "e")
testStrs(t, Q, "d", "e")
c.Close()
testStrs(t, Q, "c")
b.Close()
testStrs(t, Q, "a", "b")
testStrs(t, Q, "a", "b")
}
func TestAddChildNoWait(t *testing.T) {
Q := make(chan string, 5)
a := WithParent(Background())
b := WithParent(Background())
c := WithParent(Background())
d := WithParent(Background())
e := WithParent(Background())
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
a.AddChildNoWait(b)
a.AddChildNoWait(c)
b.AddChildNoWait(d)
e.AddChildNoWait(d)
testNone(t, Q)
b.Close()
testStrs(t, Q, "b", "d")
testStrs(t, Q, "b", "d")
a.Close()
testStrs(t, Q, "a", "c")
testStrs(t, Q, "a", "c")
e.Close()
testStrs(t, Q, "e")
}
func TestAddChild(t *testing.T) {
a := WithParent(Background())
b := WithParent(Background())
c := WithParent(Background())
d := WithParent(Background())
e := WithParent(Background())
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
a.AddChild(b)
a.AddChild(c)
b.AddChild(d)
e.AddChild(d)
testNone(t, Q)
go b.Close()
d.Close()
testStrs(t, Q, "b", "d")
testStrs(t, Q, "b", "d")
go a.Close()
c.Close()
testStrs(t, Q, "a", "c")
testStrs(t, Q, "a", "c")
e.Close()
testStrs(t, Q, "e")
}
func TestGoChildrenClose(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
var bWait = make(chan struct{})
var cWait = make(chan struct{})
var dWait = make(chan struct{})
var eWait = make(chan struct{})
a = WithParent(Background())
a.Go(func(p Process) {
b = p
b.Go(func(p Process) {
c = p
ready <- struct{}{}
<-cWait
})
ready <- struct{}{}
<-bWait
})
a.Go(func(p Process) {
d = p
d.Go(func(p Process) {
e = p
ready <- struct{}{}
<-eWait
})
ready <- struct{}{}
<-dWait
})
<-ready
<-ready
<-ready
<-ready
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
testNone(t, Q)
go a.Close()
testNone(t, Q)
bWait <- struct{}{} // relase b
go b.Close()
testNone(t, Q)
cWait <- struct{}{} // relase c
<-c.Closed()
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
eWait <- struct{}{} // release e
<-e.Closed()
testStrs(t, Q, "e")
dWait <- struct{}{} // releasse d
<-d.Closed()
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
}
func TestCloseAfterChildren(t *testing.T) {
var a, b, c, d, e Process
var ready = make(chan struct{})
a = WithParent(Background())
a.Go(func(p Process) {
b = p
b.Go(func(p Process) {
c = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
// <-p.Closing() // will CloseAfterChildren
})
a.Go(func(p Process) {
d = p
d.Go(func(p Process) {
e = p
ready <- struct{}{}
<-p.Closing() // wait till we're told to close (parents mustnt)
})
ready <- struct{}{}
<-p.Closing()
})
<-ready
<-ready
<-ready
<-ready
Q := make(chan string, 5)
go onClosedStr(Q, "a", a)
go onClosedStr(Q, "b", b)
go onClosedStr(Q, "c", c)
go onClosedStr(Q, "d", d)
go onClosedStr(Q, "e", e)
aDone := make(chan struct{})
bDone := make(chan struct{})
t.Log("test none when waiting on a")
testNone(t, Q)
go func() {
a.CloseAfterChildren()
aDone <- struct{}{}
}()
testNone(t, Q)
t.Log("test none when waiting on b")
go func() {
b.CloseAfterChildren()
bDone <- struct{}{}
}()
testNone(t, Q)
c.Close()
<-bDone
<-b.Closed()
testStrs(t, Q, "b", "c")
testStrs(t, Q, "b", "c")
e.Close()
testStrs(t, Q, "e")
d.Close()
<-aDone
<-a.Closed()
testStrs(t, Q, "a", "d")
testStrs(t, Q, "a", "d")
}
func TestGoClosing(t *testing.T) {
var ready = make(chan struct{})
a := WithParent(Background())
a.Go(func(p Process) {
// this should be fine.
a.Go(func(p Process) {
ready <- struct{}{}
})
// set a to close. should not fully close until after this func returns.
go a.Close()
// wait until a is marked as closing
<-a.Closing()
// this should also be fine.
a.Go(func(p Process) {
select {
case <-p.Closing():
// p should be marked as closing
default:
t.Error("not marked closing when it should be.")
}
ready <- struct{}{}
})
ready <- struct{}{}
})
<-ready
<-ready
<-ready
}
func TestBackground(t *testing.T) {
// test it hangs indefinitely:
b := Background()
go b.Close()
select {
case <-b.Closing():
t.Error("b.Closing() closed :(")
default:
}
}
func TestWithSignals(t *testing.T) {
p := WithSignals(syscall.SIGABRT)
testNotClosed(t, p)
syscall.Kill(syscall.Getpid(), syscall.SIGABRT)
testClosed(t, p)
}
func TestMemoryLeak(t *testing.T) {
iters := 100
fanout := 10
P := newProcess(nil)
var memories []float32
measure := func(str string) float32 {
s := new(runtime.MemStats)
runtime.ReadMemStats(s)
//fmt.Printf("%d ", s.HeapObjects)
//fmt.Printf("%d ", len(P.children))
//fmt.Printf("%d ", runtime.NumGoroutine())
//fmt.Printf("%s: %dk\n", str, s.HeapAlloc/1000)
return float32(s.HeapAlloc) / 1000
}
spawn := func() []Process {
var ps []Process
// Spawn processes
for i := 0; i < fanout; i++ {
p := WithParent(P)
ps = append(ps, p)
for i := 0; i < fanout; i++ {
p2 := WithParent(p)
ps = append(ps, p2)
for i := 0; i < fanout; i++ {
p3 := WithParent(p2)
ps = append(ps, p3)
}
}
}
return ps
}
// Read initial memory stats
measure("initial")
for i := 0; i < iters; i++ {
ps := spawn()
//measure("alloc") // read after alloc
// Close all processes
for _, p := range ps {
p.Close()
<-p.Closed()
}
ps = nil
//measure("dealloc") // read after dealloc, but before gc
// wait until all/most goroutines finish
<-time.After(time.Millisecond)
// Run GC
runtime.GC()
memories = append(memories, measure("gc")) // read after gc
}
memoryInit := memories[10]
percentGrowth := 100 * (memories[len(memories)-1] - memoryInit) / memoryInit
fmt.Printf("Memory growth after %d iteration with each %d processes: %.2f%% after %dk\n", iters, fanout*fanout*fanout, percentGrowth, int(memoryInit))
}
func testClosing(t *testing.T, p Process) {
select {
case <-p.Closing():
case <-time.After(50 * time.Millisecond):
t.Fatal("should be closing")
}
}
func testNotClosing(t *testing.T, p Process) {
select {
case <-p.Closing():
t.Fatal("should not be closing")
case <-p.Closed():
t.Fatal("should not be closed")
default:
}
}
func testClosed(t *testing.T, p Process) {
select {
case <-p.Closed():
case <-time.After(50 * time.Millisecond):
t.Fatal("should be closed")
}
}
func testNotClosed(t *testing.T, p Process) {
select {
case <-p.Closed():
t.Fatal("should not be closed")
case <-time.After(50 * time.Millisecond):
}
}
func testNone(t *testing.T, c <-chan string) {
select {
case out := <-c:
t.Fatal("none should be closed", out)
default:
}
}
func testStrs(t *testing.T, Q <-chan string, ss ...string) {
s1 := <-Q
for _, s2 := range ss {
if s1 == s2 {
return
}
}
t.Error("context not in group:", s1, ss)
}
func onClosedStr(Q chan<- string, s string, p Process) {
<-p.Closed()
Q <- s
}
package goprocess
import (
"sync"
)
// process implements Process
type process struct {
children map[*processLink]struct{} // process to close with us
waitfors map[*processLink]struct{} // process to only wait for
waiters []*processLink // processes that wait for us. for gc.
teardown TeardownFunc // called to run the teardown logic.
waiting chan struct{} // closed when CloseAfterChildrenClosed is called.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeErr error // error to return to clients of Close()
sync.Mutex
}
// newProcess constructs and returns a Process.
// It will call tf TeardownFunc exactly once:
// **after** all children have fully Closed,
// **after** entering <-Closing(), and
// **before** <-Closed().
func newProcess(tf TeardownFunc) *process {
return &process{
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
waitfors: make(map[*processLink]struct{}),
children: make(map[*processLink]struct{}),
}
}
func (p *process) WaitFor(q Process) {
if q == nil {
panic("waiting for nil process")
}
p.Lock()
select {
case <-p.Closed():
panic("Process cannot wait after being closed")
default:
}
pl := newProcessLink(p, q)
p.waitfors[pl] = struct{}{}
p.Unlock()
go pl.AddToChild()
}
func (p *process) AddChildNoWait(child Process) {
if child == nil {
panic("adding nil child process")
}
p.Lock()
select {
case <-p.Closed():
panic("Process cannot add children after being closed")
case <-p.Closing():
go child.Close()
default:
}
pl := newProcessLink(p, child)
p.children[pl] = struct{}{}
p.Unlock()
go pl.AddToChild()
}
func (p *process) AddChild(child Process) {
if child == nil {
panic("adding nil child process")
}
p.Lock()
select {
case <-p.Closed():
panic("Process cannot add children after being closed")
case <-p.Closing():
go child.Close()
default:
}
pl := newProcessLink(p, child)
if p.waitfors != nil { // if p.waitfors hasn't been set nil
p.waitfors[pl] = struct{}{}
}
if p.children != nil { // if p.children hasn't been set nil
p.children[pl] = struct{}{}
}
p.Unlock()
go pl.AddToChild()
}
func (p *process) Go(f ProcessFunc) Process {
child := newProcess(nil)
waitFor := newProcess(nil)
child.WaitFor(waitFor) // prevent child from closing
// add child last, to prevent a closing parent from
// closing all of them prematurely, before running the func.
p.AddChild(child)
go func() {
f(child)
waitFor.Close() // allow child to close.
child.CloseAfterChildren() // close to tear down.
}()
return child
}
// SetTeardown to assign a teardown function
func (p *process) SetTeardown(tf TeardownFunc) {
if tf == nil {
panic("cannot set nil TeardownFunc")
}
p.Lock()
if p.teardown != nil {
panic("cannot SetTeardown twice")
}
p.teardown = tf
select {
case <-p.Closed():
p.closeErr = tf()
default:
}
p.Unlock()
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.Lock()
// if already closing, or closed, get out. (but wait!)
select {
case <-p.Closing():
p.Unlock()
<-p.Closed()
return p.closeErr
default:
}
p.doClose()
p.Unlock()
return p.closeErr
}
func (p *process) Closing() <-chan struct{} {
return p.closing
}
func (p *process) Closed() <-chan struct{} {
return p.closed
}
func (p *process) Err() error {
<-p.Closed()
return p.closeErr
}
// the _actual_ close process.
func (p *process) doClose() {
// this function is only be called once (protected by p.Lock()).
// and it will panic (on closing channels) otherwise.
close(p.closing) // signal that we're shutting down (Closing)
for len(p.children) > 0 || len(p.waitfors) > 0 {
for plc, _ := range p.children {
child := plc.Child()
if child != nil { // check because child may already have been removed.
go child.Close() // force all children to shut down
}
plc.ParentClear()
}
p.children = nil // clear them. release memory.
// we must be careful not to iterate over waitfors directly, as it may
// change under our feet.
wf := p.waitfors
p.waitfors = nil // clear them. release memory.
for w, _ := range wf {
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
// adding a child to us can finish. we will immediately close the child.
p.Unlock()
<-w.ChildClosed() // wait till all waitfors are fully closed (before teardown)
p.Lock()
w.ParentClear()
}
}
if p.teardown != nil {
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
}
close(p.closed) // signal that we're shut down (Closed)
// go remove all the parents from the process links. optimization.
go func(waiters []*processLink) {
for _, pl := range waiters {
pl.ClearChild()
pr, ok := pl.Parent().(*process)
if !ok {
// parent has already been called to close
continue
}
pr.Lock()
delete(pr.waitfors, pl)
delete(pr.children, pl)
pr.Unlock()
}
}(p.waiters) // pass in so
p.waiters = nil // clear them. release memory.
}
// We will only wait on the children we have now.
// We will not wait on children added subsequently.
// this may change in the future.
func (p *process) CloseAfterChildren() error {
p.Lock()
select {
case <-p.Closed():
p.Unlock()
return p.Close() // get error. safe, after p.Closed()
case <-p.waiting: // already called it.
p.Unlock()
<-p.Closed()
return p.Close() // get error. safe, after p.Closed()
default:
}
p.Unlock()
// here only from one goroutine.
nextToWaitFor := func() Process {
p.Lock()
defer p.Unlock()
for e, _ := range p.waitfors {
c := e.Child()
if c == nil {
continue
}
select {
case <-c.Closed():
default:
return c
}
}
return nil
}
// wait for all processes we're waiting for are closed.
// the semantics here are simple: we will _only_ close
// if there are no processes currently waiting for.
for next := nextToWaitFor(); next != nil; next = nextToWaitFor() {
<-next.Closed()
}
// YAY! we're done. close
return p.Close()
}
package goprocess
import (
"sync"
)
// closedCh is an alread-closed channel. used to return
// in cases where we already know a channel is closed.
var closedCh chan struct{}
func init() {
closedCh = make(chan struct{})
close(closedCh)
}
// a processLink is an internal bookkeeping datastructure.
// it's used to form a relationship between two processes.
// It is mostly for keeping memory usage down (letting
// children close and be garbage-collected).
type processLink struct {
// guards all fields.
// DO NOT HOLD while holding process locks.
// it may be slow, and could deadlock if not careful.
sync.Mutex
parent Process
child Process
}
func newProcessLink(p, c Process) *processLink {
return &processLink{
parent: p,
child: c,
}
}
// Closing returns whether the child is closing
func (pl *processLink) ChildClosing() <-chan struct{} {
// grab a hold of it, and unlock, as .Closing may block.
pl.Lock()
child := pl.child
pl.Unlock()
if child == nil { // already closed? memory optimization.
return closedCh
}
return child.Closing()
}
func (pl *processLink) ChildClosed() <-chan struct{} {
// grab a hold of it, and unlock, as .Closed may block.
pl.Lock()
child := pl.child
pl.Unlock()
if child == nil { // already closed? memory optimization.
return closedCh
}
return child.Closed()
}
func (pl *processLink) ChildClose() {
// grab a hold of it, and unlock, as .Closed may block.
pl.Lock()
child := pl.child
pl.Unlock()
if child != nil { // already closed? memory optimization.
child.Close()
}
}
func (pl *processLink) ClearChild() {
pl.Lock()
pl.child = nil
pl.Unlock()
}
func (pl *processLink) ParentClear() {
pl.Lock()
pl.parent = nil
pl.Unlock()
}
func (pl *processLink) Child() Process {
pl.Lock()
defer pl.Unlock()
return pl.child
}
func (pl *processLink) Parent() Process {
pl.Lock()
defer pl.Unlock()
return pl.parent
}
func (pl *processLink) AddToChild() {
cp := pl.Child()
// is it a *process ? if not... panic.
c, ok := cp.(*process)
if !ok {
panic("goprocess does not yet support other process impls.")
}
// first, is it Closed?
c.Lock()
select {
case <-c.Closed():
c.Unlock()
// already closed. must not add.
// we must clear it, though. do so without the lock.
pl.ClearChild()
return
default:
// put the process link into q's waiters
c.waiters = append(c.waiters, pl)
c.Unlock()
}
}
{
"name": "goprocess",
"author": "whyrusleeping",
"version": "1.0.0",
"gxDependencies": [
{
"name": "context",
"hash": "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct",
"version": "1.0.0"
}
],
"language": "go",
"gx": {
"dvcsimport": "github.com/jbenet/goprocess"
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment