stream.go 2.24 KB
Newer Older
Jeromy's avatar
Jeromy committed
1
2
3
4
5
package peerstream

import (
	"fmt"

Jeromy's avatar
Jeromy committed
6
	smux "gx/QmRmT6MSnfhRDW1PTUGSd3z4fqXK48GUequQAZzeT4c5iC/go-stream-muxer"
Jeromy's avatar
Jeromy committed
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
)

// StreamHandler is a function which receives a Stream. It
// allows clients to set a function to receive newly created
// streams, and decide whether to continue adding them.
// It works sort of like a http.HandleFunc.
// Note: the StreamHandler is called sequentially, so spawn
// goroutines or pass the Stream. See EchoHandler.
type StreamHandler func(s *Stream)

// Stream is an io.{Read,Write,Close}r to a remote counterpart.
// It wraps a spdystream.Stream, and links it to a Conn and groups
type Stream struct {
	smuxStream smux.Stream

	conn   *Conn
	groups groupSet
}

func newStream(ss smux.Stream, c *Conn) *Stream {
	s := &Stream{
		conn:       c,
		smuxStream: ss,
		groups:     groupSet{m: make(map[Group]struct{})},
	}
	s.groups.AddSet(&c.groups) // inherit groups
	return s
}

// String returns a string representation of the Stream
func (s *Stream) String() string {
	f := "<peerstream.Stream %s <--> %s>"
	return fmt.Sprintf(f, s.conn.NetConn().LocalAddr(), s.conn.NetConn().RemoteAddr())
}

// SPDYStream returns the underlying *spdystream.Stream
func (s *Stream) Stream() smux.Stream {
	return s.smuxStream
}

// Conn returns the Conn associated with this Stream
func (s *Stream) Conn() *Conn {
	return s.conn
}

// Swarm returns the Swarm asociated with this Stream
func (s *Stream) Swarm() *Swarm {
	return s.conn.swarm
}

// Groups returns the Groups this Stream belongs to
func (s *Stream) Groups() []Group {
	return s.groups.Groups()
}

// InGroup returns whether this stream belongs to a Group
func (s *Stream) InGroup(g Group) bool {
	return s.groups.Has(g)
}

// AddGroup assigns given Group to Stream
func (s *Stream) AddGroup(g Group) {
	s.groups.Add(g)
}

func (s *Stream) Read(p []byte) (n int, err error) {
	return s.smuxStream.Read(p)
}

func (s *Stream) Write(p []byte) (n int, err error) {
	return s.smuxStream.Write(p)
}

func (s *Stream) Close() error {
	return s.conn.swarm.removeStream(s)
}

// StreamsWithGroup narrows down a set of streams to those in given group.
func StreamsWithGroup(g Group, streams []*Stream) []*Stream {
	var out []*Stream
	for _, s := range streams {
		if s.InGroup(g) {
			out = append(out, s)
		}
	}
	return out
}