mux.go 3.54 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
package protocol
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2
3
4
5
6
7

import (
	"fmt"
	"io"
	"sync"

8
9
10
11
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
	inet "github.com/ipfs/go-ipfs/p2p/net"
	eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
	lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
)

var log = eventlog.Logger("net/mux")

Brian Tiger Chow's avatar
Brian Tiger Chow committed
16
type streamHandlerMap map[ID]inet.StreamHandler
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
17
18
19
20
21
22
23
24
25

// Mux provides simple stream multixplexing.
// It helps you precisely when:
//  * You have many streams
//  * You have function handlers
//
// It contains the handlers for each protocol accepted.
// It dispatches handlers for streams opened by remote peers.
type Mux struct {
26
27
	lock           sync.RWMutex
	handlers       streamHandlerMap
28
	defaultHandler inet.StreamHandler
Brian Tiger Chow's avatar
Brian Tiger Chow committed
29
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
30

Brian Tiger Chow's avatar
Brian Tiger Chow committed
31
32
33
34
func NewMux() *Mux {
	return &Mux{
		handlers: streamHandlerMap{},
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
35
36
37
}

// Protocols returns the list of protocols this muxer has handlers for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
38
func (m *Mux) Protocols() []ID {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
39
40
41
	m.lock.RLock()
	l := make([]ID, 0, len(m.handlers))
	for p := range m.handlers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
42
43
		l = append(l, p)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
44
	m.lock.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
46
47
48
49
	return l
}

// readHeader reads the stream and returns the next Handler function
// according to the muxer encoding.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
50
51
func (m *Mux) readHeader(s io.Reader) (ID, inet.StreamHandler, error) {
	p, err := ReadHeader(s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52
53
54
55
	if err != nil {
		return "", nil, err
	}

Brian Tiger Chow's avatar
Brian Tiger Chow committed
56
	m.lock.RLock()
57
	defer m.lock.RUnlock()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
58
	h, found := m.handlers[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
60

	switch {
61
62
63
	case !found && m.defaultHandler != nil:
		return p, m.defaultHandler, nil
	case !found && m.defaultHandler == nil:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
64
65
66
67
68
69
70
71
		return p, nil, fmt.Errorf("%s no handler with name: %s (%d)", m, p, len(p))
	default:
		return p, h, nil
	}
}

// String returns the muxer's printing representation
func (m *Mux) String() string {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
72
73
74
	m.lock.RLock()
	defer m.lock.RUnlock()
	return fmt.Sprintf("<Muxer %p %d>", m, len(m.handlers))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75
76
}

77
78
79
80
81
82
func (m *Mux) SetDefaultHandler(h inet.StreamHandler) {
	m.lock.Lock()
	m.defaultHandler = h
	m.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
83
84
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
85
func (m *Mux) SetHandler(p ID, h inet.StreamHandler) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
86
	log.Debugf("%s setting handler for protocol: %s (%d)", m, p, len(p))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
87
88
89
	m.lock.Lock()
	m.handlers[p] = h
	m.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90
91
}

Jeromy's avatar
Jeromy committed
92
93
94
95
96
97
98
99
100
// RemoveHandler removes the protocol handler on the Network's Muxer.
// This operation is threadsafe.
func (m *Mux) RemoveHandler(p ID) {
	log.Debugf("%s removing handler for protocol: %s (%d)", m, p, len(p))
	m.lock.Lock()
	delete(m.handlers, p)
	m.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Handle reads the next name off the Stream, and calls a handler function
// This is done in its own goroutine, to avoid blocking the caller.
func (m *Mux) Handle(s inet.Stream) {
	go m.HandleSync(s)
}

// HandleSync reads the next name off the Stream, and calls a handler function
// This is done synchronously. The handler function will return before
// HandleSync returns.
func (m *Mux) HandleSync(s inet.Stream) {
	ctx := context.Background()

	name, handler, err := m.readHeader(s)
	if err != nil {
		err = fmt.Errorf("protocol mux error: %s", err)
		log.Event(ctx, "muxError", lgbl.Error(err))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
118
119
120
		return
	}

121
	log.Debugf("muxer handle protocol %s: %s", s.Conn().RemotePeer(), name)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
	handler(s)
}

// ReadLengthPrefix reads the name from Reader with a length-byte-prefix.
func ReadLengthPrefix(r io.Reader) (string, error) {
	// c-string identifier
	// the first byte is our length
	l := make([]byte, 1)
	if _, err := io.ReadFull(r, l); err != nil {
		return "", err
	}
	length := int(l[0])

	// the next are our identifier
	name := make([]byte, length)
	if _, err := io.ReadFull(r, name); err != nil {
		return "", err
	}

	return string(name), nil
}