Commit 8acc21e8 authored by Jeromy's avatar Jeromy
Browse files

Vendor in go-peerstream

parent a9de494f
Copyright 2013 Alan Shreve
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
# muxado - Stream multiplexing for Go
## What is stream multiplexing?
Imagine you have a single stream (a bi-directional stream of bytes) like a TCP connection. Stream multiplexing
is a method for enabling the transmission of multiple simultaneous streams over the one underlying transport stream.
## What is muxado?
muxado is an implementation of a stream multiplexing library in Go that can be layered on top of a net.Conn to multiplex that stream.
muxado's protocol is not currently documented explicitly, but it is very nearly an implementation of the HTTP2
framing layer with all of the HTTP-specific bits removed. It is heavily inspired by HTTP2, SPDY, and WebMUX.
## How does it work?
Simplifying, muxado chunks data sent over each multiplexed stream and transmits each piece
as a "frame" over the transport stream. It then sends these frames,
often interleaving data for multiple streams, to the remote side.
The remote endpoint then reassembles the frames into distinct streams
of data which are presented to the application layer.
## What good is it anyways?
A stream multiplexing library is a powerful tool for an application developer's toolbox which solves a number of problems:
- It allows developers to implement asynchronous/pipelined protocols with ease. Instead of matching requests with responses in your protocols, just open a new stream for each request and communicate over that.
- muxado can do application-level keep-alives and dead-session detection so that you don't have to write heartbeat code ever again.
- You never need to build connection pools for services running your protocol. You can open as many independent, concurrent streams as you need without incurring any round-trip latency costs.
- muxado allows the server to initiate new streams to clients which is normally very difficult without NAT-busting trickery.
## Show me the code!
As much as possible, the muxado library strives to look and feel just like the standard library's net package. Here's how you initiate a new client session:
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
And a server:
l, err := muxado.ListenTLS("tcp", ":1234", tlsConfig))
for {
sess, err := l.Accept()
go handleSession(sess)
}
Once you have a session, you can open new streams on it:
stream, err := sess.Open()
And accept streams opened by the remote side:
stream, err := sess.Accept()
Streams satisfy the net.Conn interface, so they're very familiar to work with:
n, err := stream.Write(buf)
n, err = stream.Read(buf)
muxado sessions and streams implement the net.Listener and net.Conn interfaces (with a small shim), so you can use them with existing golang libraries!
sess, err := muxado.DialTLS("tcp", "example.com:1234", tlsConfig)
http.Serve(sess.NetListener(), handler)
## A more extensive muxado client
// open a new session to a remote endpoint
sess, err := muxado.Dial("tcp", "example.com:1234")
if err != nil {
panic(err)
}
// handle streams initiated by the server
go func() {
for {
stream, err := sess.Accept()
if err != nil {
panic(err)
}
go handleStream(stream)
}
}()
// open new streams for application requests
for req := range requests {
str, err := sess.Open()
if err != nil {
panic(err)
}
go func(stream muxado.Stream) {
defer stream.Close()
// send request
if _, err = stream.Write(req.serialize()); err != nil {
panic(err)
}
// read response
if buf, err := ioutil.ReadAll(stream); err != nil {
panic(err)
}
handleResponse(buf)
}(str)
}
## How did you build it?
muxado is a modified implementation of the HTTP2 framing protocol with all of the HTTP-specific bits removed. It aims
for simplicity in the protocol by removing everything that is not core to multiplexing streams. The muxado code
is also built with the intention that its performance should be moderately good within the bounds of working in Go. As a result,
muxado does contain some unidiomatic code.
## API documentation
API documentation is available on godoc.org:
[muxado API documentation](https://godoc.org/github.com/inconshreveable/muxado)
## What are its biggest drawbacks?
Any stream-multiplexing library over TCP will suffer from head-of-line blocking if the next packet to service gets dropped.
muxado is also a poor choice when sending large payloads and speed is a priority.
It shines best when the application workload needs to quickly open a large number of small-payload streams.
## Status
Most of muxado's features are implemented (and tested!), but there are many that are still rough or could be improved. See the TODO file for suggestions on what needs to improve.
## License
Apache
improve the formatting of the docs to look nice for godoc
use a better example in the docs first before showing the clever integration with the net.Listener/net.Conn APIs
Make all errors support Temporary() API so applications can better decide what to do
Handle case of running out of stream ids + test
writeFrame errors should kill the session, but only if it's not a timeout + test
Short read should cause an error + test
Decrement() in outBuffer needs to have deadline support
Extensions:
Heartbeat extension needs tests
Make extensions a public API instead of a private API
Document how extensions work
Don't include any extensions by default
heartbeat test
Finish writing buffer tests
Write stress test
Write multi-frame write test
More session tests
More stream tests
Write frame/transport tests - verify read correct type, verify unknown type causes error, verify ioerror is propogated
Write frame/syn tests
Write frame/goaway tests
### Low priority:
- Add the ability to differentiate stream errors which allow you to safely retry
- Decide what to do if the application isn't handling its accepted streams fast enough. Refuse stream? Wait and block reading more frames?
- Figure out whether to die with/without lock - in GoAway/OpenStream
- Add priority APIs to stream
- Add priority extension
- Add Reset() stream API
- Eliminate unlikely race on s.remoteDebug between handleFrame() and die()
- Should writeFrame calls for rst/wndinc set the write deadline?
- don't send reset if the stream is fully closed
- include muxado pun somewhere in the docs
package muxado
import (
"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto"
"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto/frame"
)
// streamAdaptor recasts the types of some function calls by the proto/Stream implementation
// so that it satisfies the public interface
type streamAdaptor struct {
proto.IStream
}
func (a *streamAdaptor) Id() StreamId {
return StreamId(a.IStream.Id())
}
func (a *streamAdaptor) StreamType() StreamType {
return StreamType(a.IStream.StreamType())
}
func (a *streamAdaptor) Session() Session {
return &sessionAdaptor{a.IStream.Session()}
}
// sessionAdaptor recasts the types of some function calls by the proto/Session implementation
// so that it satisfies the public interface
type sessionAdaptor struct {
proto.ISession
}
func (a *sessionAdaptor) Accept() (Stream, error) {
str, err := a.ISession.Accept()
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) Open() (Stream, error) {
str, err := a.ISession.Open()
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error) {
str, err := a.ISession.OpenStream(frame.StreamPriority(priority), frame.StreamType(streamType), fin)
return &streamAdaptor{str}, err
}
func (a *sessionAdaptor) GoAway(code ErrorCode, debug []byte) error {
return a.ISession.GoAway(frame.ErrorCode(code), debug)
}
func (a *sessionAdaptor) Wait() (ErrorCode, error, []byte) {
code, err, debug := a.ISession.Wait()
return ErrorCode(code), err, debug
}
package muxado
import (
"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto"
"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto/ext"
"crypto/tls"
"net"
)
// Client returns a new muxado client-side connection using conn as the transport.
func Client(conn net.Conn) Session {
return &sessionAdaptor{proto.NewSession(conn, proto.NewStream, true, []proto.Extension{ext.NewDefaultHeartbeat()})}
}
// Dial opens a new connection to the given network/address and then beings a muxado client session on it.
func Dial(network, addr string) (sess Session, err error) {
conn, err := net.Dial(network, addr)
if err != nil {
return
}
return Client(conn), nil
}
// DialTLS opens a new TLS encrytped connection with the givent configuration
// to the network/address and then beings a muxado client session on it.
func DialTLS(network, addr string, tlsConfig *tls.Config) (sess Session, err error) {
conn, err := tls.Dial(network, addr, tlsConfig)
if err != nil {
return
}
return Client(conn), nil
}
// muxado is an implementation of a general-purpose stream-multiplexing protocol.
//
// muxado allows clients applications to multiplex a single stream-oriented connection,
// like a TCP connection, and communicate over many streams on top of it. muxado accomplishes
// this by chunking data sent over each stream into frames and then reassembling the
// frames and buffering the data before being passed up to the application
// layer on the other side.
//
// muxado is very nearly an exact implementation of the HTTP2 framing layer while leaving out all
// the HTTP-specific parts. It is heavily inspired by HTTP2/SPDY/WebMUX.
//
// muxado's documentation uses the following terms consistently for easier communication:
// - "a transport" is an underlying stream (typically TCP) over which frames are sent between
// endpoints
// - "a stream" is any of the full-duplex byte-streams multiplexed over the transport
// - "a session" refers to an instance of the muxado protocol running over a transport between
// two endpoints
//
// Perhaps the best part of muxado is the interface exposed to client libraries. Since new
// streams may be initiated by both sides at any time, a muxado.Session implements the net.Listener
// interface (almost! Go unfortunately doesn't support covariant interface satisfaction so there's
// a shim). Each muxado stream implements the net.Conn interface. This allows you to integrate
// muxado into existing code which works with these interfaces (which is most Golang networking code)
// with very little difficulty. Consider the following toy example. Here we'll initiate a new secure
// connection to a server, and then ask it which application it wants via an HTTP request over a muxado stream
// and then serve an entire HTTP application *to the server*.
//
//
// sess, err := muxado.DialTLS("tcp", "example.com:1234", new(tls.Config))
// client := &http.Client{Transport: &http.Transport{Dial: sess.NetDial}}
// resp, err := client.Get("http://example.com/appchoice")
// switch getChoice(resp.Body) {
// case "foo":
// http.Serve(sess.NetListener(), fooHandler)
// case "bar":
// http.Serve(sess.NetListener(), barHandler)
// }
//
//
// In addition to enabling multiple streams over a single connection, muxado enables other
// behaviors which can be useful to the application layer:
// - Both sides of a muxado session may initiate new streams
// - muxado can transparently run application-level heartbeats and timeout dead sessions
// - When connections fail, muxado indicates to the application which streams may be safely retried
// - muxado supports prioritizing streams to maximize useful throughput when bandwidth-constrained
//
// A few examples of what these capabilities might make muxado useful for:
// - eliminating custom async/pipeling code for your protocols
// - eliminating connection pools in your protocols
// - eliminating custom NAT traversal logic for enabling server-initiated streams
//
// muxado has been tuned to be very performant within the limits of what you can expect of pure-Go code.
// Some of muxado's code looks unidiomatic in the quest for better performance. (Locks over channels, never allocating
// from the heap, etc). muxado will typically outperform TCP connections when rapidly initiating many new
// streams with small payloads. When sending a large payload over a single stream, muxado's worst case, it can
// be 2-3x slower and does not parallelize well.
package muxado
package muxado
import (
"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto/frame"
"net"
"time"
)
type StreamId frame.StreamId
type StreamPriority frame.StreamPriority
type StreamType frame.StreamType
type ErrorCode frame.ErrorCode
// Stream is a full duplex stream-oriented connection that is multiplexed over a Session.
// Stream implement the net.Conn inteface.
type Stream interface {
// Write writes the bytes in the given buffer to the stream
Write([]byte) (int, error)
// Read reads the next bytes on the stream into the given buffer
Read([]byte) (int, error)
// Close closes the stream. It attempts to behave as Close does for a TCP conn in that it
// half-closes the stream for sending, and it will send an RST if any more data is received
// from the remote side.
Close() error
// SetDeadline sets a time after which future Read and Write operations will fail.
SetDeadline(time.Time) error
// SetReadDeadline sets a time after which future Read operations will fail.
SetReadDeadline(time.Time) error
// SetWriteDeadline sets a time after which future Write operations will fail.
SetWriteDeadline(time.Time) error
// HalfClose sends a data frame with a fin flag set to half-close the stream from the local side.
HalfClose([]byte) (int, error)
// Id returns the stream's id.
Id() StreamId
// StreamType returns the stream's type
StreamType() StreamType
// Session returns the session object this stream is running on.
Session() Session
// RemoteAddr returns the session transport's remote address.
RemoteAddr() net.Addr
// LocalAddr returns the session transport's local address.
LocalAddr() net.Addr
}
// Session multiplexes many Streams over a single underlying stream transport.
// Both sides of a muxado session can open new Streams. Sessions can also accept
// new streams from the remote side.
//
// A muxado Session implements the net.Listener interface, returning new Streams from the remote side.
type Session interface {
// Open initiates a new stream on the session. It is equivalent to OpenStream(0, 0, false)
Open() (Stream, error)
// OpenStream initiates a new stream on the session. A caller can specify a stream's priority and an opaque stream type.
// Setting fin to true will cause the stream to be half-closed from the local side immediately upon creation.
OpenStream(priority StreamPriority, streamType StreamType, fin bool) (Stream, error)
// Accept returns the next stream initiated by the remote side
Accept() (Stream, error)
// Kill closes the underlying transport stream immediately.
//
// You SHOULD always perfer to call Close() instead so that the connection
// closes cleanly by sending a GoAway frame.
Kill() error
// Close instructs the session to close cleanly, sending a GoAway frame if one hasn't already been sent.
//
// This implementation does not "linger". Pending writes on streams may fail.
//
// You MAY call Close() more than once. Each time after
// the first, Close() will return an error.
Close() error
// GoAway instructs the other side of the connection to stop
// initiating new streams by sending a GoAway frame. Most clients
// will just call Close(), but you may want explicit control of this
// in order to facilitate clean shutdowns.
//
// You MAY call GoAway() more than once. Each time after the first,
// GoAway() will return an error.
GoAway(ErrorCode, []byte) error
// LocalAddr returns the local address of the transport stream over which the session is running.
LocalAddr() net.Addr
// RemoteAddr returns the address of the remote side of the transport stream over which the session is running.
RemoteAddr() net.Addr
// Wait blocks until the session has shutdown and returns the error code for session termination. It also
// returns the error that caused the session to terminate as well as any debug information sent in the GoAway
// frame by the remote side.
Wait() (code ErrorCode, err error, debug []byte)
// NetListener returns an adaptor object which allows this Session to be used as a net.Listener. The returned
// net.Listener returns new streams initiated by the remote side as net.Conn's when calling Accept().
NetListener() net.Listener
// NetDial is a function that implements the same API as net.Dial and can be used in place of it. Users should keep
// in mind that it is the same as a call to Open(). It ignores both arguments passed to it, always initiate a new stream
// to the remote side.
NetDial(_, _ string) (net.Conn, error)
}
{
"name": "muxado",
"author": "whyrusleeping",
"version": "1.0.0",
"language": "go",
"gx": {
"dvcsimport": "github.com/inconshreveable/muxado"
}
}
\ No newline at end of file
package buffer
import (
"errors"
"io"
)
var (
FullError = errors.New("Buffer is full")
)
// Reads as much data
func readInto(rd io.Reader, p []byte) (n int, err error) {
var nr int
for n < len(p) {
nr, err = rd.Read(p[n:])
n += nr
if err != nil {
return
}
}
return
}
// A circular buffer on top of a byte-array
// NOTE: It does not implement the Write() method, it implements ReadFrom()
// to avoid copies
type Circular struct {
buf []byte // the bytes
size int // == len(buf)
head int // index of the next byte to read
tail int // index of the last byte available to read
}
// Returns a new circular buffer of the given size
func NewCircular(size int) *Circular {
return &Circular{
buf: make([]byte, size+1),
size: size + 1,
}
}
// Copy data from the given reader into the buffer
// Any errors encountered while reading are returned EXCEPT io.EOF.
// If the reader fills the buffer, it returns buffer.FullError
func (c *Circular) ReadFrom(rd io.Reader) (n int, err error) {
// IF:
// [---H+++T--]
if c.tail >= c.head {
n, err = readInto(rd, c.buf[c.tail:])
c.tail = (c.tail + n) % c.size
if err == io.EOF {
return n, nil
} else if err != nil {
return
}
}
// NOW:
// [T---H++++] or [++T--H+++]
n2, err := readInto(rd, c.buf[c.tail:c.head])
n += n2
c.tail += n2
if err == nil {
err = FullError
} else if err == io.EOF {
err = nil
}
return
}
// Read data out of the buffer. This never fails but may
// return n==0 if there is no data to be read
func (c *Circular) Read(p []byte) (n int, err error) {
if c.head > c.tail {
n = copy(p, c.buf[c.head:])
c.head = (c.head + n) % c.size
if c.head != 0 {
return
}
}
n2 := copy(p[n:], c.buf[c.head:c.tail])
n += n2
c.head += n2
return
}
package buffer
import (
"bytes"
"reflect"
"testing"
)
func incBuf(start, size int) []byte {
b := make([]byte, size)
for i := 0; i < size; i++ {
b[i] = byte((start + i) % 16)
}
return b
}
func testBuffer() *Circular {
c := NewCircular(15)
c.buf = incBuf(0, 16)
return c
}
func TestEmptyRead(t *testing.T) {
t.Parallel()
var p [1]byte
c := NewCircular(16)
n, err := c.Read(p[:])
if err != nil {
t.Fatalf("Error on read operation: %v")
}
if n != 0 {
t.Errorf("Read %d bytes, expected 0", n)
}
}
// Test Read: [H+++T---]
func TestStartRead(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 8
p := make([]byte, readSize+1)
c.tail = readSize
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Read expected %d bytes, got %d", readSize, n)
}
expected := incBuf(0, 8)
got := p[:readSize]
if !reflect.DeepEqual(expected, got) {
t.Errorf("Wrong buffer values read. Expected %v, got %v", expected, got)
}
}
func TestMiddleRead(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 8
p := make([]byte, readSize+1)
c.head = 4
c.tail = 12
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Read expected %d bytes, got %d", readSize)
}
expected := incBuf(4, 8)
got := p[:readSize]
if !reflect.DeepEqual(expected, got) {
t.Errorf("Wrong buffer values read. Expected %v, got %v", expected, got)
}
}
func TestTwoReads(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 4
p := make([]byte, readSize)
c.head = 4
c.tail = 12
for i := 0; i < 2; i++ {
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(4+(4*i), 4)
if !reflect.DeepEqual(p, expected) {
t.Fatalf("Wrong buffer values for read #%d. Expected %v, got %v", i+1, expected, p)
}
}
}
func TestReadTailZero(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 4
p := make([]byte, readSize*2)
c.head = 12
c.tail = 0
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(12, 4)
got := p[:readSize]
if !reflect.DeepEqual(got, expected) {
t.Fatalf("Wrong buffer values for read. Expected %v, got %v", expected, got)
}
}
func TestReadWrap(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 14
p := make([]byte, readSize*2)
c.head = 12
c.tail = 10
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(12, readSize)
got := p[:readSize]
if !reflect.DeepEqual(got, expected) {
t.Fatalf("Wrong buffer values for read. Expected %v, got %v", expected, got)
}
}
func TestEmptyReadAfterExhaustion(t *testing.T) {
t.Parallel()
c := testBuffer()
readSize := 14
p := make([]byte, readSize*2)
c.head = 12
c.tail = 10
n, err := c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != readSize {
t.Fatalf("Wrong read size. Expected %d, got %d", readSize, n)
}
expected := incBuf(12, readSize)
got := p[:readSize]
if !reflect.DeepEqual(got, expected) {
t.Fatalf("Wrong buffer values for read. Expected %v, got %v", expected, p)
}
n, err = c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != 0 {
t.Fatalf("Wrong read size. Expected 0, got %d", n)
}
}
func TestWriteTooBig(t *testing.T) {
t.Parallel()
size := 16
p := bytes.NewBuffer(make([]byte, size+1))
c := NewCircular(size)
_, err := c.ReadFrom(p)
if err != FullError {
t.Errorf("Expected FULL error but got %v", err)
}
}
func TestWriteReadFullFromZero(t *testing.T) {
toWrite := incBuf(0, 16)
c := NewCircular(16)
n, err := c.ReadFrom(bytes.NewBuffer(toWrite))
if err != nil {
t.Fatalf("Error while writing: %v", err)
}
if n != 16 {
t.Fatalf("Wrong number of bytes written. Expceted 16, got %d", n)
}
p := make([]byte, 16)
n, err = c.Read(p)
if err != nil {
t.Fatalf("Error while reading: %v", err)
}
if n != 16 {
t.Fatalf("")
}
}
package buffer
import (
"errors"
"io"
"io/ioutil"
"sync"
"time"
)
var (
AlreadyClosed = errors.New("Buffer already closed")
)
// A specialized concurrent circular buffer intended to buffer a stream's inbound data with the following properties:
// - Minimizes copies by skipping the buffer if a write occurs while a reader is waiting
// - Provides a mechnaism to time out reads after a deadline
// - Provides a mechanism to set an 'error' that will fail reads when the buffer is empty
type waitingReader struct {
buf []byte
n int
}
type Inbound struct {
*Circular
*sync.Cond
err error
waitingReader
deadline time.Time
timer *time.Timer
}
func NewInbound(size int) *Inbound {
return &Inbound{
Circular: NewCircular(size),
Cond: sync.NewCond(new(sync.Mutex)),
}
}
func (b *Inbound) SetDeadline(t time.Time) {
b.L.Lock()
// set the deadline
b.deadline = t
// how long until the deadline
delay := t.Sub(time.Now())
if b.timer != nil {
b.timer.Stop()
}
// after the delay, wake up waiters
b.timer = time.AfterFunc(delay, func() {
b.Broadcast()
})
b.L.Unlock()
}
func (b *Inbound) SetError(err error) {
b.L.Lock()
b.err = err
b.Broadcast()
b.L.Unlock()
}
func (b *Inbound) GetError() (err error) {
b.L.Lock()
err = b.err
b.L.Unlock()
return
}
func (b *Inbound) ReadFrom(rd io.Reader) (n int, err error) {
b.L.Lock()
if b.err != nil {
b.L.Unlock()
if _, err = ioutil.ReadAll(rd); err != nil {
return
}
return 0, AlreadyClosed
}
// write directly to a reader's buffer, if possible
if b.waitingReader.buf != nil {
b.waitingReader.n, err = readInto(rd, b.waitingReader.buf)
n += b.waitingReader.n
b.waitingReader.buf = nil
if err != nil {
if err == io.EOF {
// EOF is not an error
err = nil
}
b.Broadcast()
b.L.Unlock()
return
}
}
// write the rest to buffer
var writeN int
writeN, err = b.Circular.ReadFrom(rd)
n += writeN
b.Broadcast()
b.L.Unlock()
return
}
func (b *Inbound) Read(p []byte) (n int, err error) {
b.L.Lock()
var wait *waitingReader
for {
// we got a direct write to our buffer
if wait != nil && wait.n != 0 {
n = wait.n
break
}
// check for timeout
if !b.deadline.IsZero() {
if time.Now().After(b.deadline) {
err = errors.New("Read timeout")
break
}
}
// try to read from the buffer
n, _ = b.Circular.Read(p)
// successfully read some data
if n != 0 {
break
}
// there's an error
if b.err != nil {
err = b.err
break
}
// register for a direct write
if b.waitingReader.buf == nil {
wait = &b.waitingReader
wait.buf = p
wait.n = 0
}
// no data, wait
b.Wait()
}
b.L.Unlock()
return
}
package buffer
import (
"sync"
)
type Outbound struct {
val int
err error
*sync.Cond
}
func NewOutbound(size int) *Outbound {
return &Outbound{val: size, Cond: sync.NewCond(new(sync.Mutex))}
}
func (b *Outbound) Increment(inc int) {
b.L.Lock()
b.val += inc
b.Broadcast()
b.L.Unlock()
}
func (b *Outbound) SetError(err error) {
b.L.Lock()
b.err = err
b.Broadcast()
b.L.Unlock()
}
func (b *Outbound) Decrement(dec int) (ret int, err error) {
if dec == 0 {
return
}
b.L.Lock()
for {
if b.err != nil {
err = b.err
break
}
if b.val > 0 {
if dec > b.val {
ret = b.val
b.val = 0
break
} else {
b.val -= dec
ret = dec
break
}
} else {
b.Wait()
}
}
b.L.Unlock()
return
}
package buffer
func BothClosed(in *Inbound, out *Outbound) (closed bool) {
in.L.Lock()
out.L.Lock()
closed = (in.err != nil && out.err != nil)
out.L.Unlock()
in.L.Unlock()
return
}
package ext
import (
"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto"
)
const (
heartbeatExtensionType = proto.MinExtensionType + iota
)
package ext
// XXX: There's no logging around heartbeats - how can we do this in a way that is useful
// as a library?
//
// XXX: When we close the session because of a lost heartbeat or because of an error in the
// heartbeating, there is no way to tell that; a Session will just appear to stop working
import (
proto "QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto"
"QmfEm573cZeq3LpgccZMpngV6dXbm5gfU23F5nNUuhSxxJ/muxado/proto/frame"
"encoding/binary"
"io"
"time"
)
const (
defaultHeartbeatInterval = 3 * time.Second
defaultHeartbeatTolerance = 10 * time.Second
)
type Heartbeat struct {
sess proto.ISession
accept proto.ExtAccept
mark chan int
interval time.Duration
tolerance time.Duration
}
func NewDefaultHeartbeat() *Heartbeat {
return NewHeartbeat(defaultHeartbeatInterval, defaultHeartbeatTolerance)
}
func NewHeartbeat(interval, tolerance time.Duration) *Heartbeat {
return &Heartbeat{
mark: make(chan int),
interval: interval,
tolerance: tolerance,
}
}
func (h *Heartbeat) Start(sess proto.ISession, accept proto.ExtAccept) frame.StreamType {
h.sess = sess
h.accept = accept
go h.respond()
go h.request()
go h.check()
return heartbeatExtensionType
}
func (h *Heartbeat) check() {
t := time.NewTimer(h.interval + h.tolerance)
for {
select {
case <-t.C:
// time out waiting for a response!
h.sess.Close()
return
case <-h.mark:
t.Reset(h.interval + h.tolerance)
}
}
}
func (h *Heartbeat) respond() {
// close the session on any errors
defer h.sess.Close()
stream, err := h.accept()
if err != nil {
return
}
// read the next heartbeat id and respond
buf := make([]byte, 4)
for {
_, err := io.ReadFull(stream, buf)
if err != nil {
return
}
_, err = stream.Write(buf)
if err != nil {
return
}
}
}
func (h *Heartbeat) request() {
// close the session on any errors
defer h.sess.Close()
// request highest possible priority for heartbeats
priority := frame.StreamPriority(0x7FFFFFFF)
stream, err := h.sess.OpenStream(priority, heartbeatExtensionType, false)
if err != nil {
return
}
// send heartbeats and then check that we got them back
var id uint32
for {
time.Sleep(h.interval)
if err := binary.Write(stream, binary.BigEndian, id); err != nil {
return
}
var respId uint32
if err := binary.Read(stream, binary.BigEndian, &respId); err != nil {
return
}
if id != respId {
return
}
// record the time
h.mark <- 1
}
}
package frame
import (
"io"
)
const (
// data frames are actually longer, but they are variable length
dataFrameSize = headerSize
)
type RStreamData struct {
Header
fixed [dataFrameSize]byte
toRead io.LimitedReader // when reading, the underlying connection's io.Reader is handed up
}
func (f *RStreamData) Reader() io.Reader {
return &f.toRead
}
func (f *RStreamData) readFrom(d deserializer) (err error) {
// not using io.LimitReader to avoid a heap memory allocation in the hot path
f.toRead.R = d
f.toRead.N = int64(f.Length())
return
}
// WStreamData is a StreamData frame that you can write
// It delivers opaque data on a stream to the application layer
type WStreamData struct {
Header
fixed [dataFrameSize]byte
toWrite []byte // when writing, you just pass a byte slice to write
}
func (f *WStreamData) writeTo(s serializer) (err error) {
if _, err = s.Write(f.fixed[:]); err != nil {
return err
}
if _, err = s.Write(f.toWrite); err != nil {
return err
}
return
}
func (f *WStreamData) Set(streamId StreamId, data []byte, fin bool) (err error) {
var flags flagsType
if fin {
flags.Set(flagFin)
}
if err = f.Header.SetAll(TypeStreamData, len(data), streamId, flags); err != nil {
return
}
f.toWrite = data
return
}
func NewWStreamData() (f *WStreamData) {
f = new(WStreamData)
f.Header = f.fixed[:headerSize]
return
}
package frame
import (
"bytes"
"io/ioutil"
"reflect"
"testing"
)
type fakeTrans struct {
*bytes.Buffer
}
func (c *fakeTrans) Close() error { return nil }
func loadedTrans(p []byte) (*fakeTrans, *BasicTransport) {
trans := &fakeTrans{bytes.NewBuffer(p)}
return trans, NewBasicTransport(trans)
}
type DataTestParams struct {
streamId StreamId
data []byte
fin bool
}
func TestSerializeData(t *testing.T) {
t.Parallel()
cases := []struct {
params DataTestParams
expected []byte
}{
{
// test a generic data frame
DataTestParams{0x49a1bb00, []byte{0x00, 0x01, 0x02, 0x03, 0x04}, false},
[]byte{0x0, 0x5, 0x0, TypeStreamData, 0x49, 0xa1, 0xbb, 0x00, 0x00, 0x01, 0x02, 0x03, 0x04},
},
{
// test a a frame with fin
DataTestParams{streamMask, []byte{0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0xAA, 0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00}, true},
[]byte{0x00, 0x10, flagFin, TypeStreamData, 0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0xAA, 0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00},
},
{
// test a zero-length frame
DataTestParams{0x0, []byte{}, false},
[]byte{0x0, 0x0, 0x0, TypeStreamData, 0x0, 0x0, 0x0, 0x0},
},
}
for _, tcase := range cases {
buf, trans := loadedTrans([]byte{})
var f *WStreamData = NewWStreamData()
if err := f.Set(tcase.params.streamId, tcase.params.data, tcase.params.fin); err != nil {
t.Fatalf("Error while setting params %v!", tcase.params)
}
if err := f.writeTo(trans); err != nil {
t.Fatalf("Error while writing %v!", tcase.params)
}
if !reflect.DeepEqual(tcase.expected, buf.Bytes()) {
t.Errorf("Failed to serialize STREAM_DATA, expected: %v got %v", tcase.expected, buf.Bytes())
}
}
}
func TestDeserializeData(t *testing.T) {
_, trans := loadedTrans([]byte{0x00, 0x10, flagFin, TypeStreamData, 0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0xAA, 0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00})
h := newHeader()
if err := h.readFrom(trans); err != nil {
t.Fatalf("Failed to read header")
}
var f RStreamData
f.Header = h
if err := f.readFrom(trans); err != nil {
t.Fatalf("Read failed with %v", err)
}
got, err := ioutil.ReadAll(f.Reader())
if err != nil {
t.Fatalf("Error %v while reading data", err)
}
expected := []byte{0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0xAA, 0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00}
if !reflect.DeepEqual(expected, got) {
t.Errorf("Wrong bytes read from transport. Expected %v, got %v", expected, got)
}
if !f.Fin() {
t.Errorf("Fin flag was not deserialized")
}
}
func TestTooLongSerializeData(t *testing.T) {
t.Parallel()
var f *WStreamData = NewWStreamData()
if err := f.Set(0, make([]byte, lengthMask+1), true); err == nil {
t.Errorf("Expected error when setting too long buffer, got none.")
}
}
func TestLengthLimitationData(t *testing.T) {
dataLen := 0x4
_, trans := loadedTrans([]byte{0x00, byte(dataLen), 0x0, TypeStreamData, 0x7F, 0xFF, 0xFF, 0xFF, 0xFF, 0xEE, 0xDD, 0xCC, 0xBB, 0xAA, 0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00})
h := newHeader()
if err := h.readFrom(trans); err != nil {
t.Fatalf("Failed to read header")
}
var f RStreamData
f.Header = h
if err := f.readFrom(trans); err != nil {
t.Fatalf("Read failed with %v", err)
}
got, err := ioutil.ReadAll(f.Reader())
if err != nil {
t.Fatalf("Error %v while reading data", err)
}
if len(got) != dataLen {
t.Errorf("Read with wrong number of bytes, got %d expected %d", len(got), 4)
}
expected := []byte{0xFF, 0xEE, 0xDD, 0xCC}
if !reflect.DeepEqual(expected, got) {
t.Errorf("Wrong bytes read from transport. Expected %v, got %v", expected, got)
}
}
package frame
import "fmt"
import "io"
type DebugTransport struct {
prefix string
*BasicTransport
}
func (t *DebugTransport) Write(buf []byte) (int, error) {
fmt.Printf("%v writes %d bytes: %x\n", t.prefix, len(buf), buf)
return t.BasicTransport.Write(buf)
}
func (t *DebugTransport) WriteFrame(frame WFrame) (err error) {
// each frame knows how to write iteself to the framer
return frame.writeTo(t)
}
func (t *DebugTransport) ReadFrame() (f RFrame, err error) {
f, err = t.BasicTransport.ReadFrame()
fmt.Printf("%v reads Header length: %v\n", t.prefix, t.Header.Length())
fmt.Printf("%v reads Header type: %v\n", t.prefix, t.Header.Type())
fmt.Printf("%v reads Header stream id: %v\n", t.prefix, t.Header.StreamId())
fmt.Printf("%v reads Header fin: %v\n", t.prefix, t.Header.Fin())
return
}
func NewDebugTransport(rwc io.ReadWriteCloser, prefix string) *DebugTransport {
trans := &DebugTransport{
prefix: prefix,
BasicTransport: &BasicTransport{ReadWriteCloser: rwc, Header: make([]byte, headerSize)},
}
return trans
}
package frame
import (
"fmt"
)
const (
NoError = iota
ProtocolError
InternalError
FlowControlError
StreamClosed
FrameSizeError
RefusedStream
Cancel
NoSuchError
)
type FramingError struct {
error
}
func protoError(fmtstr string, args ...interface{}) FramingError {
return FramingError{fmt.Errorf(fmtstr, args...)}
}
package frame
import "io"
const (
goAwayBodySize = 8
goAwayFrameSize = headerSize + goAwayBodySize
)
// Instruct the remote side not to initiate new streams
type RGoAway struct {
Header
body [goAwayBodySize]byte
debug []byte
}
func (f *RGoAway) LastStreamId() StreamId {
return StreamId(order.Uint32(f.body[0:]) & streamMask)
}
func (f *RGoAway) ErrorCode() ErrorCode {
return ErrorCode(order.Uint32(f.body[4:]))
}
func (f *RGoAway) Debug() []byte {
return f.debug
}
func (f *RGoAway) readFrom(d deserializer) (err error) {
if _, err = io.ReadFull(d, f.body[:]); err != nil {
return
}
f.debug = make([]byte, f.Length()-goAwayBodySize)
if _, err = io.ReadFull(d, f.debug); err != nil {
return
}
return
}
type WGoAway struct {
Header
data [goAwayFrameSize]byte
debug []byte
}
func (f *WGoAway) writeTo(s serializer) (err error) {
if _, err = s.Write(f.data[:]); err != nil {
return
}
if _, err = s.Write(f.debug); err != nil {
return
}
return
}
func (f *WGoAway) Set(lastStreamId StreamId, errorCode ErrorCode, debug []byte) (err error) {
if f.Header.SetAll(TypeGoAway, len(debug)+goAwayFrameSize, 0, 0); err != nil {
return
}
if lastStreamId > streamMask {
err = protoError("Related stream id %d is out of range", lastStreamId)
return
}
order.PutUint32(f.data[headerSize:], uint32(lastStreamId))
order.PutUint32(f.data[headerSize+4:], uint32(errorCode))
return
}
func NewWGoAway() (f *WGoAway) {
f = new(WGoAway)
f.Header = Header(f.data[:headerSize])
return
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment