swarm_test.go 5.81 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
package swarm

import (
	"bytes"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
5
	"fmt"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
6
7
8
9
10
	"io"
	"sync"
	"testing"
	"time"

Jeromy's avatar
Jeromy committed
11
	metrics "github.com/ipfs/go-ipfs/metrics"
12
13
14
	inet "github.com/ipfs/go-ipfs/p2p/net"
	peer "github.com/ipfs/go-ipfs/p2p/peer"
	testutil "github.com/ipfs/go-ipfs/util/testutil"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
15

16
17
	ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
	context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
18
19
)

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
20
func EchoStreamHandler(stream inet.Stream) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
21
22
23
24
	go func() {
		defer stream.Close()

		// pull out the ipfs conn
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
25
		c := stream.Conn()
26
		log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
27
28
29
30
31
32

		buf := make([]byte, 4)

		for {
			if _, err := stream.Read(buf); err != nil {
				if err != io.EOF {
33
					log.Info("ping receive error:", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
34
35
36
37
38
				}
				return
			}

			if !bytes.Equal(buf, []byte("ping")) {
39
				log.Infof("ping receive error: ping != %s %v", buf, buf)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
40
41
42
43
				return
			}

			if _, err := stream.Write([]byte("pong")); err != nil {
44
				log.Info("pond send error:", err)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
46
47
48
49
50
				return
			}
		}
	}()
}

51
func makeSwarms(ctx context.Context, t *testing.T, num int) []*Swarm {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52
53
54
55
56
57
58
59
60
	swarms := make([]*Swarm, 0, num)

	for i := 0; i < num; i++ {
		localnp := testutil.RandPeerNetParamsOrFatal(t)

		peerstore := peer.NewPeerstore()
		peerstore.AddPubKey(localnp.ID, localnp.PubKey)
		peerstore.AddPrivKey(localnp.ID, localnp.PrivKey)

61
		addrs := []ma.Multiaddr{localnp.Addr}
Jeromy's avatar
Jeromy committed
62
		swarm, err := NewSwarm(ctx, addrs, localnp.ID, peerstore, metrics.NewBandwidthCounter())
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
63
64
65
66
67
68
69
70
		if err != nil {
			t.Fatal(err)
		}

		swarm.SetStreamHandler(EchoStreamHandler)
		swarms = append(swarms, swarm)
	}

71
	return swarms
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
73
}

74
func connectSwarms(t *testing.T, ctx context.Context, swarms []*Swarm) {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75
76
77
78

	var wg sync.WaitGroup
	connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
		// TODO: make a DialAddr func.
79
		s.peers.AddAddr(dst, addr, peer.PermanentAddrTTL)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
80
81
82
83
84
85
86
		if _, err := s.Dial(ctx, dst); err != nil {
			t.Fatal("error swarm dialing to peer", err)
		}
		wg.Done()
	}

	log.Info("Connecting swarms simultaneously.")
87
88
89
	for _, s1 := range swarms {
		for _, s2 := range swarms {
			if s2.local != s1.local { // don't connect to self.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
90
				wg.Add(1)
91
				connect(s1, s2.LocalPeer(), s2.ListenAddresses()[0]) // try the first.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
			}
		}
	}
	wg.Wait()

	for _, s := range swarms {
		log.Infof("%s swarm routing table: %s", s.local, s.Peers())
	}
}

func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
	// t.Skip("skipping for another test")

	ctx := context.Background()
106
	swarms := makeSwarms(ctx, t, SwarmNum)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
107
108

	// connect everyone
109
	connectSwarms(t, ctx, swarms)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
110
111
112
113
114
115
116
117
118

	// ping/pong
	for _, s1 := range swarms {
		log.Debugf("-------------------------------------------------------")
		log.Debugf("%s ping pong round", s1.local)
		log.Debugf("-------------------------------------------------------")

		_, cancel := context.WithCancel(ctx)
		got := map[peer.ID]int{}
119
		errChan := make(chan error, MsgNum*len(swarms))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
120
121
122
123
124
125
126
127
128
129
130
131
132
		streamChan := make(chan *Stream, MsgNum)

		// send out "ping" x MsgNum to every peer
		go func() {
			defer close(streamChan)

			var wg sync.WaitGroup
			send := func(p peer.ID) {
				defer wg.Done()

				// first, one stream per peer (nice)
				stream, err := s1.NewStreamWithPeer(p)
				if err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
133
					errChan <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
					return
				}

				// send out ping!
				for k := 0; k < MsgNum; k++ { // with k messages
					msg := "ping"
					log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
					if _, err := stream.Write([]byte(msg)); err != nil {
						errChan <- err
						continue
					}
				}

				// read it later
				streamChan <- stream
			}

151
152
			for _, s2 := range swarms {
				if s2.local == s1.local {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
153
154
155
156
					continue // dont send to self...
				}

				wg.Add(1)
157
				go send(s2.local)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
158
159
160
161
162
163
164
165
			}
			wg.Wait()
		}()

		// receive "pong" x MsgNum from every peer
		go func() {
			defer close(errChan)
			count := 0
166
			countShouldBe := MsgNum * (len(swarms) - 1)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
167
168
169
170
171
172
173
174
175
176
177
178
179
			for stream := range streamChan { // one per peer
				defer stream.Close()

				// get peer on the other side
				p := stream.Conn().RemotePeer()

				// receive pings
				msgCount := 0
				msg := make([]byte, 4)
				for k := 0; k < MsgNum; k++ { // with k messages

					// read from the stream
					if _, err := stream.Read(msg); err != nil {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
180
						errChan <- err
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
181
182
183
184
						continue
					}

					if string(msg) != "pong" {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
185
						errChan <- fmt.Errorf("unexpected message: %s", msg)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
186
187
188
189
190
191
192
193
194
195
196
197
						continue
					}

					log.Debugf("%s %s %s (%d)", s1.local, msg, p, k)
					msgCount++
				}

				got[p] = msgCount
				count += msgCount
			}

			if count != countShouldBe {
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
198
				errChan <- fmt.Errorf("count mismatch: %d != %d", count, countShouldBe)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
199
200
201
202
203
204
205
206
207
208
209
			}
		}()

		// check any errors (blocks till consumer is done)
		for err := range errChan {
			if err != nil {
				t.Error(err.Error())
			}
		}

		log.Debugf("%s got pongs", s1.local)
210
211
		if (len(swarms) - 1) != len(got) {
			t.Errorf("got (%d) less messages than sent (%d).", len(got), len(swarms))
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
		}

		for p, n := range got {
			if n != MsgNum {
				t.Error("peer did not get all msgs", p, n, "/", MsgNum)
			}
		}

		cancel()
		<-time.After(10 * time.Millisecond)
	}

	for _, s := range swarms {
		s.Close()
	}
}

func TestSwarm(t *testing.T) {
	// t.Skip("skipping for another test")
231
	t.Parallel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
232
233
234
235
236
237
238
239
240

	// msgs := 1000
	msgs := 100
	swarms := 5
	SubtestSwarm(t, swarms, msgs)
}

func TestConnHandler(t *testing.T) {
	// t.Skip("skipping for another test")
241
	t.Parallel()
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
242
243

	ctx := context.Background()
244
	swarms := makeSwarms(ctx, t, 5)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
245
246
247
248
249
250

	gotconn := make(chan struct{}, 10)
	swarms[0].SetConnHandler(func(conn *Conn) {
		gotconn <- struct{}{}
	})

251
	connectSwarms(t, ctx, swarms)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272

	<-time.After(time.Millisecond)
	// should've gotten 5 by now.

	swarms[0].SetConnHandler(nil)

	expect := 4
	for i := 0; i < expect; i++ {
		select {
		case <-time.After(time.Second):
			t.Fatal("failed to get connections")
		case <-gotconn:
		}
	}

	select {
	case <-gotconn:
		t.Fatalf("should have connected to %d swarms", expect)
	default:
	}
}