mux.go 3.47 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
package protocol
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2
3
4
5
6
7
8
9

import (
	"fmt"
	"io"
	"sync"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	inet "github.com/jbenet/go-ipfs/p2p/net"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
12
13
14
15
16
	eventlog "github.com/jbenet/go-ipfs/util/eventlog"
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
)

var log = eventlog.Logger("net/mux")

Brian Tiger Chow's avatar
Brian Tiger Chow committed
17
type streamHandlerMap map[ID]inet.StreamHandler
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
19
20
21
22
23
24
25
26

// Mux provides simple stream multixplexing.
// It helps you precisely when:
//  * You have many streams
//  * You have function handlers
//
// It contains the handlers for each protocol accepted.
// It dispatches handlers for streams opened by remote peers.
type Mux struct {
27
28
	// defaultHandler handles unknown protocols. Callers modify at your own risk.
	defaultHandler inet.StreamHandler
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29
30
31
32

	lock     sync.RWMutex
	handlers streamHandlerMap
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
33

Brian Tiger Chow's avatar
Brian Tiger Chow committed
34
35
36
37
func NewMux() *Mux {
	return &Mux{
		handlers: streamHandlerMap{},
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
39
40
}

// Protocols returns the list of protocols this muxer has handlers for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
41
func (m *Mux) Protocols() []ID {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
42
43
44
	m.lock.RLock()
	l := make([]ID, 0, len(m.handlers))
	for p := range m.handlers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
46
		l = append(l, p)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
47
	m.lock.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48
49
50
51
52
	return l
}

// readHeader reads the stream and returns the next Handler function
// according to the muxer encoding.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
func (m *Mux) readHeader(s io.Reader) (ID, inet.StreamHandler, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
	// log.Error("ReadProtocolHeader")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
	p, err := ReadHeader(s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
56
57
58
59
60
	if err != nil {
		return "", nil, err
	}

	// log.Debug("readHeader got:", p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
61
62
63
	m.lock.RLock()
	h, found := m.handlers[p]
	m.lock.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
65

	switch {
66
67
68
	case !found && m.defaultHandler != nil:
		return p, m.defaultHandler, nil
	case !found && m.defaultHandler == nil:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
70
71
72
73
74
75
76
		return p, nil, fmt.Errorf("%s no handler with name: %s (%d)", m, p, len(p))
	default:
		return p, h, nil
	}
}

// String returns the muxer's printing representation
func (m *Mux) String() string {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
77
78
79
	m.lock.RLock()
	defer m.lock.RUnlock()
	return fmt.Sprintf("<Muxer %p %d>", m, len(m.handlers))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
81
}

82
83
84
85
86
87
func (m *Mux) SetDefaultHandler(h inet.StreamHandler) {
	m.lock.Lock()
	m.defaultHandler = h
	m.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
88
89
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90
func (m *Mux) SetHandler(p ID, h inet.StreamHandler) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
91
	log.Debugf("%s setting handler for protocol: %s (%d)", m, p, len(p))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
92
93
94
	m.lock.Lock()
	m.handlers[p] = h
	m.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
}

// Handle reads the next name off the Stream, and calls a handler function
// This is done in its own goroutine, to avoid blocking the caller.
func (m *Mux) Handle(s inet.Stream) {
	go m.HandleSync(s)
}

// HandleSync reads the next name off the Stream, and calls a handler function
// This is done synchronously. The handler function will return before
// HandleSync returns.
func (m *Mux) HandleSync(s inet.Stream) {
	ctx := context.Background()

	name, handler, err := m.readHeader(s)
	if err != nil {
		err = fmt.Errorf("protocol mux error: %s", err)
		log.Error(err)
		log.Event(ctx, "muxError", lgbl.Error(err))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
114
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
		return
	}

	log.Infof("muxer handle protocol: %s", name)
	log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name})
	handler(s)
}

// ReadLengthPrefix reads the name from Reader with a length-byte-prefix.
func ReadLengthPrefix(r io.Reader) (string, error) {
	// c-string identifier
	// the first byte is our length
	l := make([]byte, 1)
	if _, err := io.ReadFull(r, l); err != nil {
		return "", err
	}
	length := int(l[0])

	// the next are our identifier
	name := make([]byte, length)
	if _, err := io.ReadFull(r, name); err != nil {
		return "", err
	}

	return string(name), nil
}