relay.go 4.01 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package relay

import (
	"fmt"
	"io"

	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
	ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
	mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"

	inet "github.com/jbenet/go-ipfs/p2p/net"
	peer "github.com/jbenet/go-ipfs/p2p/peer"
	eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)

var log = eventlog.Logger("relay")

// ProtocolRelay is the ProtocolID of the Relay Service.
const ProtocolRelay inet.ProtocolID = "/ipfs/relay"

// Relay is a structure that implements ProtocolRelay.
// It is a simple relay service which forwards traffic
// between two directly connected peers.
//
// the protocol is very simple:
//
//   /ipfs/relay\n
//   <multihash src id>
//   <multihash dst id>
//   <data stream>
//
type RelayService struct {
	Network inet.Network
	handler inet.StreamHandler // for streams sent to us locally.

	cg ctxgroup.ContextGroup
}

func NewRelayService(ctx context.Context, n inet.Network, sh inet.StreamHandler) *RelayService {
	s := &RelayService{
		Network: n,
		handler: sh,
		cg:      ctxgroup.WithContext(ctx),
	}
	n.SetHandler(inet.ProtocolRelay, s.requestHandler)
	return s
}

// requestHandler is the function called by clients
func (rs *RelayService) requestHandler(s inet.Stream) {
	if err := rs.handleStream(s); err != nil {
		log.Error("RelayService error:", err)
	}
}

// handleStream is our own handler, which returns an error for simplicity.
func (rs *RelayService) handleStream(s inet.Stream) error {
	defer s.Close()

	// read the header (src and dst peer.IDs)
	src, dst, err := ReadHeader(s)
	if err != nil {
		return fmt.Errorf("stream with bad header: %s", err)
	}

	local := rs.Network.LocalPeer()

	switch {
	case src == local:
		return fmt.Errorf("relaying from self")
	case dst == local: // it's for us! yaaay.
		log.Debugf("%s consuming stream from %s", rs.Network.LocalPeer(), src)
		return rs.consumeStream(s)
	default: // src and dst are not local. relay it.
		log.Debugf("%s relaying stream %s <--> %s", rs.Network.LocalPeer(), src, dst)
		return rs.pipeStream(src, dst, s)
	}
}

// consumeStream connects streams directed to the local peer
// to our handler, with the header now stripped (read).
func (rs *RelayService) consumeStream(s inet.Stream) error {
	rs.handler(s) // boom.
	return nil
}

// pipeStream relays over a stream to a remote peer. It's like `cat`
func (rs *RelayService) pipeStream(src, dst peer.ID, s inet.Stream) error {
	s2, err := rs.openStreamToPeer(dst)
	if err != nil {
		return fmt.Errorf("failed to open stream to peer: %s -- %s", dst, err)
	}

	if err := WriteHeader(s2, src, dst); err != nil {
		return err
	}

	// connect the series of tubes.
	done := make(chan retio, 2)
	go func() {
		n, err := io.Copy(s2, s)
		done <- retio{n, err}
	}()
	go func() {
		n, err := io.Copy(s, s2)
		done <- retio{n, err}
	}()

	r1 := <-done
	r2 := <-done
	log.Infof("relayed %d/%d bytes between %s and %s", r1.n, r2.n, src, dst)

	if r1.err != nil {
		return r1.err
	}
	return r2.err
}

// openStreamToPeer opens a pipe to a remote endpoint
// for now, can only open streams to directly connected peers.
// maybe we can do some routing later on.
func (rs *RelayService) openStreamToPeer(p peer.ID) (inet.Stream, error) {
	return rs.Network.NewStream(ProtocolRelay, p)
}

func ReadHeader(r io.Reader) (src, dst peer.ID, err error) {

	mhr := mh.NewReader(r)

	s, err := mhr.ReadMultihash()
	if err != nil {
		return "", "", err
	}

	d, err := mhr.ReadMultihash()
	if err != nil {
		return "", "", err
	}

	return peer.ID(s), peer.ID(d), nil
}

func WriteHeader(w io.Writer, src, dst peer.ID) error {
	// write header to w.
	mhw := mh.NewWriter(w)
	if err := mhw.WriteMultihash(mh.Multihash(src)); err != nil {
		return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
	}
	if err := mhw.WriteMultihash(mh.Multihash(dst)); err != nil {
		return fmt.Errorf("failed to write relay header: %s -- %s", dst, err)
	}

	return nil
}

type retio struct {
	n   int64
	err error
}