Commit a103a2da authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

protocol and muxer pkg

parent 72df463f
package mux
import (
"fmt"
"io"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inet "github.com/jbenet/go-ipfs/p2p/net"
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
)
var log = eventlog.Logger("net/mux")
type StreamHandlerMap map[protocol.ID]inet.StreamHandler
// Mux provides simple stream multixplexing.
// It helps you precisely when:
// * You have many streams
// * You have function handlers
//
// It contains the handlers for each protocol accepted.
// It dispatches handlers for streams opened by remote peers.
//
// WARNING: this datastructure IS NOT threadsafe.
// do not modify it once the network is using it.
type Mux struct {
Default inet.StreamHandler // handles unknown protocols.
Handlers StreamHandlerMap
sync.RWMutex
}
// Protocols returns the list of protocols this muxer has handlers for
func (m *Mux) Protocols() []protocol.ID {
m.RLock()
l := make([]protocol.ID, 0, len(m.Handlers))
for p := range m.Handlers {
l = append(l, p)
}
m.RUnlock()
return l
}
// readHeader reads the stream and returns the next Handler function
// according to the muxer encoding.
func (m *Mux) readHeader(s io.Reader) (protocol.ID, inet.StreamHandler, error) {
// log.Error("ReadProtocolHeader")
p, err := protocol.ReadHeader(s)
if err != nil {
return "", nil, err
}
// log.Debug("readHeader got:", p)
m.RLock()
h, found := m.Handlers[p]
m.RUnlock()
switch {
case !found && m.Default != nil:
return p, m.Default, nil
case !found && m.Default == nil:
return p, nil, fmt.Errorf("%s no handler with name: %s (%d)", m, p, len(p))
default:
return p, h, nil
}
}
// String returns the muxer's printing representation
func (m *Mux) String() string {
m.RLock()
defer m.RUnlock()
return fmt.Sprintf("<Muxer %p %d>", m, len(m.Handlers))
}
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (m *Mux) SetHandler(p protocol.ID, h inet.StreamHandler) {
log.Debugf("%s setting handler for protocol: %s (%d)", m, p, len(p))
m.Lock()
m.Handlers[p] = h
m.Unlock()
}
// Handle reads the next name off the Stream, and calls a handler function
// This is done in its own goroutine, to avoid blocking the caller.
func (m *Mux) Handle(s inet.Stream) {
// Flow control and backpressure of Opening streams is broken.
// I believe that spdystream has one set of workers that both send
// data AND accept new streams (as it's just more data). there
// is a problem where if the new stream handlers want to throttle,
// they also eliminate the ability to read/write data, which makes
// forward-progress impossible. Thus, throttling this function is
// -- at this moment -- not the solution. Either spdystream must
// change, or we must throttle another way.
//
// In light of this, we use a goroutine for now (otherwise the
// spdy worker totally blocks, and we can't even read the protocol
// header). The better route in the future is to use a worker pool.
go m.HandleSync(s)
}
// HandleSync reads the next name off the Stream, and calls a handler function
// This is done synchronously. The handler function will return before
// HandleSync returns.
func (m *Mux) HandleSync(s inet.Stream) {
ctx := context.Background()
name, handler, err := m.readHeader(s)
if err != nil {
err = fmt.Errorf("protocol mux error: %s", err)
log.Error(err)
log.Event(ctx, "muxError", lgbl.Error(err))
return
}
log.Infof("muxer handle protocol: %s", name)
log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name})
handler(s)
}
// ReadLengthPrefix reads the name from Reader with a length-byte-prefix.
func ReadLengthPrefix(r io.Reader) (string, error) {
// c-string identifier
// the first byte is our length
l := make([]byte, 1)
if _, err := io.ReadFull(r, l); err != nil {
return "", err
}
length := int(l[0])
// the next are our identifier
name := make([]byte, length)
if _, err := io.ReadFull(r, name); err != nil {
return "", err
}
return string(name), nil
}
package mux
import (
"bytes"
"testing"
inet "github.com/jbenet/go-ipfs/p2p/net"
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
)
var testCases = map[string]string{
"/bitswap": "\u0009/bitswap\n",
"/dht": "\u0005/dht\n",
"/ipfs": "\u0006/ipfs\n",
"/ipfs/dksnafkasnfkdajfkdajfdsjadosiaaodj": ")/ipfs/dksnafkasnfkdajfkdajfdsjadosiaaodj\n",
}
func TestWrite(t *testing.T) {
for k, v := range testCases {
var buf bytes.Buffer
if err := protocol.WriteHeader(&buf, protocol.ID(k)); err != nil {
t.Fatal(err)
}
v2 := buf.Bytes()
if !bytes.Equal(v2, []byte(v)) {
t.Errorf("failed: %s - %v != %v", k, []byte(v), v2)
}
}
}
func TestHandler(t *testing.T) {
outs := make(chan string, 10)
h := func(n string) func(s inet.Stream) {
return func(s inet.Stream) {
outs <- n
}
}
m := Mux{Handlers: StreamHandlerMap{}}
m.Default = h("default")
m.Handlers["/dht"] = h("bitswap")
// m.Handlers["/ipfs"] = h("bitswap") // default!
m.Handlers["/bitswap"] = h("bitswap")
m.Handlers["/ipfs/dksnafkasnfkdajfkdajfdsjadosiaaodj"] = h("bitswap")
for k, v := range testCases {
var buf bytes.Buffer
if _, err := buf.Write([]byte(v)); err != nil {
t.Error(err)
continue
}
name, err := protocol.ReadHeader(&buf)
if err != nil {
t.Error(err)
continue
}
if name != protocol.ID(k) {
t.Errorf("name mismatch: %s != %s", k, name)
continue
}
}
}
package protocol
import (
"io"
msgio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
)
// ID is an identifier used to write protocol headers in streams.
type ID string
// These are reserved protocol.IDs.
const (
TestingID ID = "/p2p/_testing"
)
// WriteHeader writes a protocol.ID header to an io.Writer. This is so
// multiple protocols can be multiplexed on top of the same transport.
//
// We use go-msgio varint encoding:
// <varint length><string name>\n
// (the varint includes the \n)
func WriteHeader(w io.Writer, id ID) error {
vw := msgio.NewVarintWriter(w)
s := string(id) + "\n" // add \n
return vw.WriteMsg([]byte(s))
}
// ReadHeader reads a protocol.ID header from an io.Reader. This is so
// multiple protocols can be multiplexed on top of the same transport.
// See WriteHeader.
func ReadHeader(r io.Reader) (ID, error) {
vr := msgio.NewVarintReader(r)
msg, err := vr.ReadMsg()
if err != nil {
return ID(""), err
}
msg = msg[:len(msg)-1] // remove \n
return ID(msg), nil
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment