mux.go 3.35 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
package protocol
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
2
3
4
5
6
7
8
9

import (
	"fmt"
	"io"
	"sync"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
10
	inet "github.com/jbenet/go-ipfs/p2p/net"
11
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
16
	lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
)

var log = eventlog.Logger("net/mux")

Brian Tiger Chow's avatar
Brian Tiger Chow committed
17
type streamHandlerMap map[ID]inet.StreamHandler
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
19
20
21
22
23
24
25
26

// Mux provides simple stream multixplexing.
// It helps you precisely when:
//  * You have many streams
//  * You have function handlers
//
// It contains the handlers for each protocol accepted.
// It dispatches handlers for streams opened by remote peers.
type Mux struct {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
27
28
29

	lock     sync.RWMutex
	handlers streamHandlerMap
30
	defaultHandler inet.StreamHandler
Brian Tiger Chow's avatar
Brian Tiger Chow committed
31
}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
32

Brian Tiger Chow's avatar
Brian Tiger Chow committed
33
34
35
36
func NewMux() *Mux {
	return &Mux{
		handlers: streamHandlerMap{},
	}
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
38
39
}

// Protocols returns the list of protocols this muxer has handlers for
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40
func (m *Mux) Protocols() []ID {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
41
42
43
	m.lock.RLock()
	l := make([]ID, 0, len(m.handlers))
	for p := range m.handlers {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
44
45
		l = append(l, p)
	}
Brian Tiger Chow's avatar
Brian Tiger Chow committed
46
	m.lock.RUnlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
47
48
49
50
51
	return l
}

// readHeader reads the stream and returns the next Handler function
// according to the muxer encoding.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52
func (m *Mux) readHeader(s io.Reader) (ID, inet.StreamHandler, error) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
53
	// log.Error("ReadProtocolHeader")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
	p, err := ReadHeader(s)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
55
56
57
58
59
	if err != nil {
		return "", nil, err
	}

	// log.Debug("readHeader got:", p)
Brian Tiger Chow's avatar
Brian Tiger Chow committed
60
	m.lock.RLock()
61
	defer m.lock.RUnlock()
Brian Tiger Chow's avatar
Brian Tiger Chow committed
62
	h, found := m.handlers[p]
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
64

	switch {
65
66
67
	case !found && m.defaultHandler != nil:
		return p, m.defaultHandler, nil
	case !found && m.defaultHandler == nil:
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
69
70
71
72
73
74
75
		return p, nil, fmt.Errorf("%s no handler with name: %s (%d)", m, p, len(p))
	default:
		return p, h, nil
	}
}

// String returns the muxer's printing representation
func (m *Mux) String() string {
Brian Tiger Chow's avatar
Brian Tiger Chow committed
76
77
78
	m.lock.RLock()
	defer m.lock.RUnlock()
	return fmt.Sprintf("<Muxer %p %d>", m, len(m.handlers))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
80
}

81
82
83
84
85
86
func (m *Mux) SetDefaultHandler(h inet.StreamHandler) {
	m.lock.Lock()
	m.defaultHandler = h
	m.lock.Unlock()
}

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
87
88
// SetHandler sets the protocol handler on the Network's Muxer.
// This operation is threadsafe.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
89
func (m *Mux) SetHandler(p ID, h inet.StreamHandler) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90
	log.Debugf("%s setting handler for protocol: %s (%d)", m, p, len(p))
Brian Tiger Chow's avatar
Brian Tiger Chow committed
91
92
93
	m.lock.Lock()
	m.handlers[p] = h
	m.lock.Unlock()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
}

// Handle reads the next name off the Stream, and calls a handler function
// This is done in its own goroutine, to avoid blocking the caller.
func (m *Mux) Handle(s inet.Stream) {
	go m.HandleSync(s)
}

// HandleSync reads the next name off the Stream, and calls a handler function
// This is done synchronously. The handler function will return before
// HandleSync returns.
func (m *Mux) HandleSync(s inet.Stream) {
	ctx := context.Background()

	name, handler, err := m.readHeader(s)
	if err != nil {
		err = fmt.Errorf("protocol mux error: %s", err)
		log.Event(ctx, "muxError", lgbl.Error(err))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
112
		s.Close()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
113
114
115
		return
	}

116
	log.Infof("muxer handle protocol %s: %s", s.Conn().RemotePeer(), name)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
	handler(s)
}

// ReadLengthPrefix reads the name from Reader with a length-byte-prefix.
func ReadLengthPrefix(r io.Reader) (string, error) {
	// c-string identifier
	// the first byte is our length
	l := make([]byte, 1)
	if _, err := io.ReadFull(r, l); err != nil {
		return "", err
	}
	length := int(l[0])

	// the next are our identifier
	name := make([]byte, length)
	if _, err := io.ReadFull(r, name); err != nil {
		return "", err
	}

	return string(name), nil
}