Commit 21580ccd authored by Juan Batiz-Benet's avatar Juan Batiz-Benet
Browse files

swap net2 -> net

parent 08b8250c
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
eventlog "github.com/jbenet/go-ipfs/util/eventlog" eventlog "github.com/jbenet/go-ipfs/util/eventlog"
inet "github.com/jbenet/go-ipfs/p2p/net2" inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
protocol "github.com/jbenet/go-ipfs/p2p/protocol" protocol "github.com/jbenet/go-ipfs/p2p/protocol"
identify "github.com/jbenet/go-ipfs/p2p/protocol/identify" identify "github.com/jbenet/go-ipfs/p2p/protocol/identify"
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"io" "io"
"testing" "testing"
inet "github.com/jbenet/go-ipfs/p2p/net2" inet "github.com/jbenet/go-ipfs/p2p/net"
protocol "github.com/jbenet/go-ipfs/p2p/protocol" protocol "github.com/jbenet/go-ipfs/p2p/protocol"
testutil "github.com/jbenet/go-ipfs/p2p/test/util" testutil "github.com/jbenet/go-ipfs/p2p/test/util"
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
eventlog "github.com/jbenet/go-ipfs/util/eventlog" eventlog "github.com/jbenet/go-ipfs/util/eventlog"
inet "github.com/jbenet/go-ipfs/p2p/net2" inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
protocol "github.com/jbenet/go-ipfs/p2p/protocol" protocol "github.com/jbenet/go-ipfs/p2p/protocol"
) )
......
package backpressure_tests
import (
crand "crypto/rand"
"io"
"math/rand"
"testing"
"time"
inet "github.com/jbenet/go-ipfs/p2p/net"
netutil "github.com/jbenet/go-ipfs/p2p/net/swarmnet/util"
peer "github.com/jbenet/go-ipfs/p2p/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
var log = eventlog.Logger("backpressure")
// TestBackpressureStreamHandler tests whether mux handler
// ratelimiting works. Meaning, since the handler is sequential
// it should block senders.
//
// Important note: spdystream (which peerstream uses) has a set
// of n workers (n=spdsystream.FRAME_WORKERS) which handle new
// frames, including those starting new streams. So all of them
// can be in the handler at one time. Also, the sending side
// does not rate limit unless we call stream.Wait()
//
//
// Note: right now, this happens muxer-wide. the muxer should
// learn to flow control, so handlers cant block each other.
func TestBackpressureStreamHandler(t *testing.T) {
t.Skip(`Sadly, as cool as this test is, it doesn't work
Because spdystream doesnt handle stream open backpressure
well IMO. I'll see about rewriting that part when it becomes
a problem.
`)
// a number of concurrent request handlers
limit := 10
// our way to signal that we're done with 1 request
requestHandled := make(chan struct{})
// handler rate limiting
receiverRatelimit := make(chan struct{}, limit)
for i := 0; i < limit; i++ {
receiverRatelimit <- struct{}{}
}
// sender counter of successfully opened streams
senderOpened := make(chan struct{}, limit*100)
// sender signals it's done (errored out)
senderDone := make(chan struct{})
// the receiver handles requests with some rate limiting
receiver := func(s inet.Stream) {
log.Debug("receiver received a stream")
<-receiverRatelimit // acquire
go func() {
// our request handler. can do stuff here. we
// simulate something taking time by waiting
// on requestHandled
log.Error("request worker handling...")
<-requestHandled
log.Error("request worker done!")
receiverRatelimit <- struct{}{} // release
}()
}
// the sender opens streams as fast as possible
sender := func(net inet.Network, remote peer.ID) {
var s inet.Stream
var err error
defer func() {
t.Error(err)
log.Debug("sender error. exiting.")
senderDone <- struct{}{}
}()
for {
s, err = net.NewStream(inet.ProtocolTesting, remote)
if err != nil {
return
}
_ = s
// if err = s.SwarmStream().Stream().Wait(); err != nil {
// return
// }
// "count" another successfully opened stream
// (large buffer so shouldn't block in normal operation)
log.Debug("sender opened another stream!")
senderOpened <- struct{}{}
}
}
// count our senderOpened events
countStreamsOpenedBySender := func(min int) int {
opened := 0
for opened < min {
log.Debugf("countStreamsOpenedBySender got %d (min %d)", opened, min)
select {
case <-senderOpened:
opened++
case <-time.After(10 * time.Millisecond):
}
}
return opened
}
// count our received events
// waitForNReceivedStreams := func(n int) {
// for n > 0 {
// log.Debugf("waiting for %d received streams...", n)
// select {
// case <-receiverRatelimit:
// n--
// }
// }
// }
testStreamsOpened := func(expected int) {
log.Debugf("testing rate limited to %d streams", expected)
if n := countStreamsOpenedBySender(expected); n != expected {
t.Fatalf("rate limiting did not work :( -- %d != %d", expected, n)
}
}
// ok that's enough setup. let's do it!
ctx := context.Background()
n1 := netutil.GenNetwork(t, ctx)
n2 := netutil.GenNetwork(t, ctx)
// setup receiver handler
n1.SetHandler(inet.ProtocolTesting, receiver)
log.Debugf("dialing %s", n2.ListenAddresses())
if err := n1.DialPeer(ctx, n2.LocalPeer()); err != nil {
t.Fatalf("Failed to dial:", err)
}
// launch sender!
go sender(n2, n1.LocalPeer())
// ok, what do we expect to happen? the receiver should
// receive 10 requests and stop receiving, blocking the sender.
// we can test this by counting 10x senderOpened requests
<-senderOpened // wait for the sender to successfully open some.
testStreamsOpened(limit - 1)
// let's "handle" 3 requests.
<-requestHandled
<-requestHandled
<-requestHandled
// the sender should've now been able to open exactly 3 more.
testStreamsOpened(3)
// shouldn't have opened anything more
testStreamsOpened(0)
// let's "handle" 100 requests in batches of 5
for i := 0; i < 20; i++ {
<-requestHandled
<-requestHandled
<-requestHandled
<-requestHandled
<-requestHandled
testStreamsOpened(5)
}
// success!
// now for the sugar on top: let's tear down the receiver. it should
// exit the sender.
n1.Close()
// shouldn't have opened anything more
testStreamsOpened(0)
select {
case <-time.After(100 * time.Millisecond):
t.Error("receiver shutdown failed to exit sender")
case <-senderDone:
log.Info("handler backpressure works!")
}
}
// TestStBackpressureStreamWrite tests whether streams see proper
// backpressure when writing data over the network streams.
func TestStBackpressureStreamWrite(t *testing.T) {
// senderWrote signals that the sender wrote bytes to remote.
// the value is the count of bytes written.
senderWrote := make(chan int, 10000)
// sender signals it's done (errored out)
senderDone := make(chan struct{})
// writeStats lets us listen to all the writes and return
// how many happened and how much was written
writeStats := func() (int, int) {
writes := 0
bytes := 0
for {
select {
case n := <-senderWrote:
writes++
bytes = bytes + n
default:
log.Debugf("stats: sender wrote %d bytes, %d writes", bytes, writes)
return bytes, writes
}
}
}
// sender attempts to write as fast as possible, signaling on the
// completion of every write. This makes it possible to see how
// fast it's actually writing. We pair this with a receiver
// that waits for a signal to read.
sender := func(s inet.Stream) {
defer func() {
s.Close()
senderDone <- struct{}{}
}()
// ready a buffer of random data
buf := make([]byte, 65536)
crand.Read(buf)
for {
// send a randomly sized subchunk
from := rand.Intn(len(buf) / 2)
to := rand.Intn(len(buf) / 2)
sendbuf := buf[from : from+to]
n, err := s.Write(sendbuf)
if err != nil {
log.Debug("sender error. exiting:", err)
return
}
log.Debugf("sender wrote %d bytes", n)
senderWrote <- n
}
}
// receive a number of bytes from a stream.
// returns the number of bytes written.
receive := func(s inet.Stream, expect int) {
log.Debugf("receiver to read %d bytes", expect)
rbuf := make([]byte, expect)
n, err := io.ReadFull(s, rbuf)
if err != nil {
t.Error("read failed:", err)
}
if expect != n {
t.Error("read len differs: %d != %d", expect, n)
}
}
// ok let's do it!
// setup the networks
ctx := context.Background()
n1 := netutil.GenNetwork(t, ctx)
n2 := netutil.GenNetwork(t, ctx)
netutil.DivulgeAddresses(n1, n2)
netutil.DivulgeAddresses(n2, n1)
// setup sender handler on 1
n1.SetHandler(inet.ProtocolTesting, sender)
log.Debugf("dialing %s", n2.ListenAddresses())
if err := n1.DialPeer(ctx, n2.LocalPeer()); err != nil {
t.Fatalf("Failed to dial:", err)
}
// open a stream, from 2->1, this is our reader
s, err := n2.NewStream(inet.ProtocolTesting, n1.LocalPeer())
if err != nil {
t.Fatal(err)
}
// let's make sure r/w works.
testSenderWrote := func(bytesE int) {
bytesA, writesA := writeStats()
if bytesA != bytesE {
t.Errorf("numbers failed: %d =?= %d bytes, via %d writes", bytesA, bytesE, writesA)
}
}
// 500ms rounds of lockstep write + drain
roundsStart := time.Now()
roundsTotal := 0
for roundsTotal < (2 << 20) {
// let the sender fill its buffers, it will stop sending.
<-time.After(300 * time.Millisecond)
b, _ := writeStats()
testSenderWrote(0)
testSenderWrote(0)
// drain it all, wait again
receive(s, b)
roundsTotal = roundsTotal + b
}
roundsTime := time.Now().Sub(roundsStart)
// now read continously, while we measure stats.
stop := make(chan struct{})
contStart := time.Now()
go func() {
for {
select {
case <-stop:
return
default:
receive(s, 2<<15)
}
}
}()
contTotal := 0
for contTotal < (2 << 20) {
n := <-senderWrote
contTotal += n
}
stop <- struct{}{}
contTime := time.Now().Sub(contStart)
// now compare! continuous should've been faster AND larger
if roundsTime < contTime {
t.Error("continuous should have been faster")
}
if roundsTotal < contTotal {
t.Error("continuous should have been larger, too!")
}
// and a couple rounds more for good measure ;)
for i := 0; i < 3; i++ {
// let the sender fill its buffers, it will stop sending.
<-time.After(300 * time.Millisecond)
b, _ := writeStats()
testSenderWrote(0)
testSenderWrote(0)
// drain it all, wait again
receive(s, b)
}
// this doesn't work :(:
// // now for the sugar on top: let's tear down the receiver. it should
// // exit the sender.
// n1.Close()
// testSenderWrote(0)
// testSenderWrote(0)
// select {
// case <-time.After(2 * time.Second):
// t.Error("receiver shutdown failed to exit sender")
// case <-senderDone:
// log.Info("handler backpressure works!")
// }
}
...@@ -19,11 +19,6 @@ import ( ...@@ -19,11 +19,6 @@ import (
var log = eventlog.Logger("conn") var log = eventlog.Logger("conn")
const (
// MaxMessageSize is the size of the largest single message. (4MB)
MaxMessageSize = 1 << 22
)
// ReleaseBuffer puts the given byte array back into the buffer pool, // ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size // first verifying that it is the correct size
func ReleaseBuffer(b []byte) { func ReleaseBuffer(b []byte) {
...@@ -48,15 +43,8 @@ func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn ...@@ -48,15 +43,8 @@ func newSingleConn(ctx context.Context, local, remote peer.ID, maconn manet.Conn
maconn: maconn, maconn: maconn,
msgrw: msgio.NewReadWriter(maconn), msgrw: msgio.NewReadWriter(maconn),
} }
log.Debugf("newSingleConn %p: %v to %v", conn, local, remote)
// version handshake
if err := Handshake1(ctx, conn); err != nil {
conn.Close()
return nil, fmt.Errorf("Handshake1 failed: %s", err)
}
log.Debugf("newSingleConn %p: %v to %v finished", conn, local, remote) log.Debugf("newSingleConn %p: %v to %v", conn, local, remote)
return conn, nil return conn, nil
} }
......
package conn
import (
"fmt"
handshake "github.com/jbenet/go-ipfs/p2p/net/handshake"
hspb "github.com/jbenet/go-ipfs/p2p/net/handshake/pb"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ggprotoio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
)
// Handshake1 exchanges local and remote versions and compares them
// closes remote and returns an error in case of major difference
func Handshake1(ctx context.Context, c Conn) error {
rpeer := c.RemotePeer()
lpeer := c.LocalPeer()
// setup up protobuf io
maxSize := 4096
r := ggprotoio.NewDelimitedReader(c, maxSize)
w := ggprotoio.NewDelimitedWriter(c)
localH := handshake.Handshake1Msg()
remoteH := new(hspb.Handshake1)
// send the outgoing handshake message
if err := w.WriteMsg(localH); err != nil {
return err
}
log.Debugf("%p sent my version (%s) to %s", c, localH, rpeer)
log.Event(ctx, "handshake1Sent", lpeer)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := r.ReadMsg(remoteH); err != nil {
return fmt.Errorf("could not receive remote version: %q", err)
}
log.Debugf("%p received remote version (%s) from %s", c, remoteH, rpeer)
log.Event(ctx, "handshake1Received", lpeer)
if err := handshake.Handshake1Compatible(localH, remoteH); err != nil {
log.Infof("%s (%s) incompatible version with %s (%s)", lpeer, localH, rpeer, remoteH)
return err
}
log.Debugf("%s version handshake compatible %s", lpeer, rpeer)
return nil
}
# IFPS Handshake
The IPFS Protocol Handshake is divided into three sequential steps
1. Version Handshake (`Hanshake1`)
2. Secure Channel (`NewSecureConn`)
3. Services (`Handshake3`)
Currently these parts currently happen sequentially (costing an awful 5 RTT),
but can be optimized to 2 RTT.
### Version Handshake
The Version Handshake ensures that nodes speaking to each other can interoperate.
They send each other protocol versions and ensure there is a match on the major
version (semver).
### Secure Channel
The second part exchanges keys and establishes a secure comm channel. This
follows ECDHE TLS, but *isn't* TLS. (why will be written up elsewhere).
### Services
The Services portion sends any additional information on nodes needed
by the nodes, e.g. Listen Address (the received address could be a Dial addr),
and later on can include Service listing (dht, exchange, ipns, etc).
/*
package handshake implements the ipfs handshake protocol
IPFS Handshake
The IPFS Protocol Handshake is divided into three sequential steps
1. Version Handshake (`Hanshake1`)
2. Secure Channel (`NewSecureConn`)
3. Services (`Handshake3`)
Currently these parts currently happen sequentially (costing an awful 5 RTT),
but can be optimized to 2 RTT.
Version Handshake
The Version Handshake ensures that nodes speaking to each other can interoperate.
They send each other protocol versions and ensure there is a match on the major
version (semver).
Secure Channel
The second part exchanges keys and establishes a secure comm channel. This
follows ECDHE TLS, but *isn't* TLS. (why will be written up elsewhere).
Services
The Services portion sends any additional information on nodes needed
by the nodes, e.g. Listen Address (the received address could be a Dial addr),
and later on can include Service listing (dht, exchange, ipns, etc).
*/
package handshake
package handshake
import (
"errors"
"fmt"
config "github.com/jbenet/go-ipfs/config"
pb "github.com/jbenet/go-ipfs/p2p/net/handshake/pb"
u "github.com/jbenet/go-ipfs/util"
semver "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
)
var log = u.Logger("handshake")
// IpfsVersion holds the current protocol version for a client running this code
var IpfsVersion *semver.Version
var ClientVersion = "go-ipfs/" + config.CurrentVersionNumber
func init() {
var err error
IpfsVersion, err = semver.NewVersion("0.0.1")
if err != nil {
panic(fmt.Errorf("invalid protocol version: %v", err))
}
}
// Handshake1Msg returns the current protocol version as a protobuf message
func Handshake1Msg() *pb.Handshake1 {
return NewHandshake1(IpfsVersion.String(), ClientVersion)
}
// ErrVersionMismatch is returned when two clients don't share a protocol version
var ErrVersionMismatch = errors.New("protocol missmatch")
// Handshake1Compatible checks whether two versions are compatible
// returns nil if they are fine
func Handshake1Compatible(handshakeA, handshakeB *pb.Handshake1) error {
a, err := semver.NewVersion(*handshakeA.ProtocolVersion)
if err != nil {
return err
}
b, err := semver.NewVersion(*handshakeB.ProtocolVersion)
if err != nil {
return err
}
if a.Major != b.Major {
return ErrVersionMismatch
}
return nil
}
// NewHandshake1 creates a new Handshake1 from the two strings
func NewHandshake1(protoVer, agentVer string) *pb.Handshake1 {
if protoVer == "" {
protoVer = IpfsVersion.String()
}
if agentVer == "" {
agentVer = ClientVersion
}
return &pb.Handshake1{
ProtocolVersion: &protoVer,
AgentVersion: &agentVer,
}
}
package handshake
import "testing"
func TestH1Compatible(t *testing.T) {
tcases := []struct {
a, b string
expected error
}{
{"0.0.0", "0.0.0", nil},
{"1.0.0", "1.1.0", nil},
{"1.0.0", "1.0.1", nil},
{"0.0.0", "1.0.0", ErrVersionMismatch},
{"1.0.0", "0.0.0", ErrVersionMismatch},
}
for i, tcase := range tcases {
if Handshake1Compatible(NewHandshake1(tcase.a, ""), NewHandshake1(tcase.b, "")) != tcase.expected {
t.Fatalf("case[%d] failed", i)
}
}
}
PB = $(wildcard *.proto)
GO = $(PB:.proto=.pb.go)
all: $(GO)
%.pb.go: %.proto
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
clean:
rm *.pb.go
// Code generated by protoc-gen-gogo.
// source: handshake.proto
// DO NOT EDIT!
/*
Package handshake_pb is a generated protocol buffer package.
It is generated from these files:
handshake.proto
It has these top-level messages:
Handshake1
Handshake3
*/
package handshake_pb
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
import json "encoding/json"
import math "math"
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
var _ = proto.Marshal
var _ = &json.SyntaxError{}
var _ = math.Inf
// Handshake1 is delivered _before_ the secure channel is initialized
type Handshake1 struct {
// protocolVersion determines compatibility between peers
ProtocolVersion *string `protobuf:"bytes,1,opt,name=protocolVersion" json:"protocolVersion,omitempty"`
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
// includes the client name and client. e.g. "go-ipfs/0.1.0"
AgentVersion *string `protobuf:"bytes,2,opt,name=agentVersion" json:"agentVersion,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Handshake1) Reset() { *m = Handshake1{} }
func (m *Handshake1) String() string { return proto.CompactTextString(m) }
func (*Handshake1) ProtoMessage() {}
func (m *Handshake1) GetProtocolVersion() string {
if m != nil && m.ProtocolVersion != nil {
return *m.ProtocolVersion
}
return ""
}
func (m *Handshake1) GetAgentVersion() string {
if m != nil && m.AgentVersion != nil {
return *m.AgentVersion
}
return ""
}
// Handshake3 is delivered _after_ the secure channel is initialized
type Handshake3 struct {
// can include all the values in handshake1, for protocol version, etc.
H1 *Handshake1 `protobuf:"bytes,5,opt,name=h1" json:"h1,omitempty"`
// publicKey is this node's public key (which also gives its node.ID)
// - may not need to be sent, as secure channel implies it has been sent.
// - then again, if we change / disable secure channel, may still want it.
PublicKey []byte `protobuf:"bytes,1,opt,name=publicKey" json:"publicKey,omitempty"`
// listenAddrs are the multiaddrs the sender node listens for open connections on
ListenAddrs [][]byte `protobuf:"bytes,2,rep,name=listenAddrs" json:"listenAddrs,omitempty"`
// protocols are the services this node is running
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
// this is useful information to convey to the other side, as it helps the remote endpoint
// determine whether its connection to the local peer goes through NAT.
ObservedAddr []byte `protobuf:"bytes,4,opt,name=observedAddr" json:"observedAddr,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Handshake3) Reset() { *m = Handshake3{} }
func (m *Handshake3) String() string { return proto.CompactTextString(m) }
func (*Handshake3) ProtoMessage() {}
func (m *Handshake3) GetH1() *Handshake1 {
if m != nil {
return m.H1
}
return nil
}
func (m *Handshake3) GetPublicKey() []byte {
if m != nil {
return m.PublicKey
}
return nil
}
func (m *Handshake3) GetListenAddrs() [][]byte {
if m != nil {
return m.ListenAddrs
}
return nil
}
func (m *Handshake3) GetProtocols() []string {
if m != nil {
return m.Protocols
}
return nil
}
func (m *Handshake3) GetObservedAddr() []byte {
if m != nil {
return m.ObservedAddr
}
return nil
}
func init() {
}
package handshake.pb;
//import "github.com/jbenet/go-ipfs/net/mux/mux.proto";
// Handshake1 is delivered _before_ the secure channel is initialized
message Handshake1 {
// protocolVersion determines compatibility between peers
optional string protocolVersion = 1; // semver
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
// includes the client name and client. e.g. "go-ipfs/0.1.0"
optional string agentVersion = 2; // semver
// we'll have more fields here later.
}
// Handshake3 is delivered _after_ the secure channel is initialized
message Handshake3 {
// can include all the values in handshake1, for protocol version, etc.
optional Handshake1 h1 = 5;
// publicKey is this node's public key (which also gives its node.ID)
// - may not need to be sent, as secure channel implies it has been sent.
// - then again, if we change / disable secure channel, may still want it.
optional bytes publicKey = 1;
// listenAddrs are the multiaddrs the sender node listens for open connections on
repeated bytes listenAddrs = 2;
// protocols are the services this node is running
repeated string protocols = 3;
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
// this is useful information to convey to the other side, as it helps the remote endpoint
// determine whether its connection to the local peer goes through NAT.
optional bytes observedAddr = 4;
}
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"io" "io"
conn "github.com/jbenet/go-ipfs/p2p/net/conn" conn "github.com/jbenet/go-ipfs/p2p/net/conn"
// swarm "github.com/jbenet/go-ipfs/p2p/net/swarm2"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -12,20 +11,6 @@ import ( ...@@ -12,20 +11,6 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
) )
// ProtocolID is an identifier used to write protocol headers in streams.
type ProtocolID string
// These are the ProtocolIDs of the protocols running. It is useful
// to keep them in one place.
const (
ProtocolTesting ProtocolID = "/ipfs/testing"
ProtocolBitswap ProtocolID = "/ipfs/bitswap"
ProtocolDHT ProtocolID = "/ipfs/dht"
ProtocolIdentify ProtocolID = "/ipfs/id"
ProtocolDiag ProtocolID = "/ipfs/diagnostics"
ProtocolRelay ProtocolID = "/ipfs/relay"
)
// MessageSizeMax is a soft (recommended) maximum for network messages. // MessageSizeMax is a soft (recommended) maximum for network messages.
// One can write more, as the interface is a stream. But it is useful // One can write more, as the interface is a stream. But it is useful
// to bunch it up into multiple read/writes when the whole message is // to bunch it up into multiple read/writes when the whole message is
...@@ -45,12 +30,10 @@ type Stream interface { ...@@ -45,12 +30,10 @@ type Stream interface {
Conn() Conn Conn() Conn
} }
// StreamHandler is the function protocols who wish to listen to // StreamHandler is the type of function used to listen for
// incoming streams must implement. // streams opened by the remote side.
type StreamHandler func(Stream) type StreamHandler func(Stream)
type StreamHandlerMap map[ProtocolID]StreamHandler
// Conn is a connection to a remote peer. It multiplexes streams. // Conn is a connection to a remote peer. It multiplexes streams.
// Usually there is no need to use a Conn directly, but it may // Usually there is no need to use a Conn directly, but it may
// be useful to get information about the peer on the other side: // be useful to get information about the peer on the other side:
...@@ -58,11 +41,15 @@ type StreamHandlerMap map[ProtocolID]StreamHandler ...@@ -58,11 +41,15 @@ type StreamHandlerMap map[ProtocolID]StreamHandler
type Conn interface { type Conn interface {
conn.PeerConn conn.PeerConn
// NewStreamWithProtocol constructs a new Stream over this conn. // NewStream constructs a new Stream over this conn.
NewStreamWithProtocol(pr ProtocolID) (Stream, error) NewStream() (Stream, error)
} }
// Network is the interface IPFS uses for connecting to the world. // ConnHandler is the type of function used to listen for
// connections opened by the remote side.
type ConnHandler func(Conn)
// Network is the interface used to connect to the outside world.
// It dials and listens for connections. it uses a Swarm to pool // It dials and listens for connections. it uses a Swarm to pool
// connnections (see swarm pkg, and peerstream.Swarm). Connections // connnections (see swarm pkg, and peerstream.Swarm). Connections
// are encrypted with a TLS-like protocol. // are encrypted with a TLS-like protocol.
...@@ -70,22 +57,17 @@ type Network interface { ...@@ -70,22 +57,17 @@ type Network interface {
Dialer Dialer
io.Closer io.Closer
// SetHandler sets the protocol handler on the Network's Muxer. // SetStreamHandler sets the handler for new streams opened by the
// This operation is threadsafe. // remote side. This operation is threadsafe.
SetHandler(ProtocolID, StreamHandler) SetStreamHandler(StreamHandler)
// Protocols returns the list of protocols this network currently // SetConnHandler sets the handler for new connections opened by the
// has registered handlers for. // remote side. This operation is threadsafe.
Protocols() []ProtocolID SetConnHandler(ConnHandler)
// NewStream returns a new stream to given peer p. // NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one. // If there is no connection to p, attempts to create one.
// If ProtocolID is "", writes no header. NewStream(peer.ID) (Stream, error)
NewStream(ProtocolID, peer.ID) (Stream, error)
// BandwidthTotals returns the total number of bytes passed through
// the network since it was instantiated
BandwidthTotals() (uint64, uint64)
// ListenAddresses returns a list of addresses at which this network listens. // ListenAddresses returns a list of addresses at which this network listens.
ListenAddresses() []ma.Multiaddr ListenAddresses() []ma.Multiaddr
...@@ -112,8 +94,8 @@ type Dialer interface { ...@@ -112,8 +94,8 @@ type Dialer interface {
// LocalPeer returns the local peer associated with this network // LocalPeer returns the local peer associated with this network
LocalPeer() peer.ID LocalPeer() peer.ID
// DialPeer attempts to establish a connection to a given peer // DialPeer establishes a connection to a given peer
DialPeer(context.Context, peer.ID) error DialPeer(context.Context, peer.ID) (Conn, error)
// ClosePeer closes the connection to a given peer // ClosePeer closes the connection to a given peer
ClosePeer(peer.ID) error ClosePeer(peer.ID) error
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"time" "time"
ic "github.com/jbenet/go-ipfs/p2p/crypto" ic "github.com/jbenet/go-ipfs/p2p/crypto"
host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net" inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
...@@ -20,16 +21,18 @@ import ( ...@@ -20,16 +21,18 @@ import (
type Mocknet interface { type Mocknet interface {
// GenPeer generates a peer and its inet.Network in the Mocknet // GenPeer generates a peer and its inet.Network in the Mocknet
GenPeer() (inet.Network, error) GenPeer() (host.Host, error)
// AddPeer adds an existing peer. we need both a privkey and addr. // AddPeer adds an existing peer. we need both a privkey and addr.
// ID is derived from PrivKey // ID is derived from PrivKey
AddPeer(ic.PrivKey, ma.Multiaddr) (inet.Network, error) AddPeer(ic.PrivKey, ma.Multiaddr) (host.Host, error)
// retrieve things (with randomized iteration order) // retrieve things (with randomized iteration order)
Peers() []peer.ID Peers() []peer.ID
Net(peer.ID) inet.Network Net(peer.ID) inet.Network
Nets() []inet.Network Nets() []inet.Network
Host(peer.ID) host.Host
Hosts() []host.Host
Links() LinkMap Links() LinkMap
LinksBetweenPeers(a, b peer.ID) []Link LinksBetweenPeers(a, b peer.ID) []Link
LinksBetweenNets(a, b inet.Network) []Link LinksBetweenNets(a, b inet.Network) []Link
...@@ -52,8 +55,8 @@ type Mocknet interface { ...@@ -52,8 +55,8 @@ type Mocknet interface {
// Connections are the usual. Connecting means Dialing. // Connections are the usual. Connecting means Dialing.
// **to succeed, peers must be linked beforehand** // **to succeed, peers must be linked beforehand**
ConnectPeers(peer.ID, peer.ID) error ConnectPeers(peer.ID, peer.ID) (inet.Conn, error)
ConnectNets(inet.Network, inet.Network) error ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
DisconnectPeers(peer.ID, peer.ID) error DisconnectPeers(peer.ID, peer.ID) error
DisconnectNets(inet.Network, inet.Network) error DisconnectNets(inet.Network, inet.Network) error
} }
......
...@@ -53,7 +53,7 @@ func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) { ...@@ -53,7 +53,7 @@ func FullMeshConnected(ctx context.Context, n int) (Mocknet, error) {
nets := m.Nets() nets := m.Nets()
for _, n1 := range nets { for _, n1 := range nets {
for _, n2 := range nets { for _, n2 := range nets {
if err := m.ConnectNets(n1, n2); err != nil { if _, err := m.ConnectNets(n1, n2); err != nil {
return nil, err return nil, err
} }
} }
......
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
ic "github.com/jbenet/go-ipfs/p2p/crypto" ic "github.com/jbenet/go-ipfs/p2p/crypto"
inet "github.com/jbenet/go-ipfs/p2p/net" inet "github.com/jbenet/go-ipfs/p2p/net"
mux "github.com/jbenet/go-ipfs/p2p/net/services/mux"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
...@@ -83,14 +82,10 @@ func (c *conn) openStream() *stream { ...@@ -83,14 +82,10 @@ func (c *conn) openStream() *stream {
return sl return sl
} }
func (c *conn) NewStreamWithProtocol(pr inet.ProtocolID) (inet.Stream, error) { func (c *conn) NewStream() (inet.Stream, error) {
log.Debugf("Conn.NewStreamWithProtocol: %s --> %s", c.local, c.remote) log.Debugf("Conn.NewStreamWithProtocol: %s --> %s", c.local, c.remote)
s := c.openStream() s := c.openStream()
if err := mux.WriteProtocolHeader(pr, s); err != nil {
s.Close()
return nil, err
}
return s, nil return s, nil
} }
......
...@@ -3,8 +3,11 @@ package mocknet ...@@ -3,8 +3,11 @@ package mocknet
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
ic "github.com/jbenet/go-ipfs/p2p/crypto" ic "github.com/jbenet/go-ipfs/p2p/crypto"
host "github.com/jbenet/go-ipfs/p2p/host"
bhost "github.com/jbenet/go-ipfs/p2p/host/basic"
inet "github.com/jbenet/go-ipfs/p2p/net" inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
testutil "github.com/jbenet/go-ipfs/util/testutil" testutil "github.com/jbenet/go-ipfs/util/testutil"
...@@ -16,9 +19,8 @@ import ( ...@@ -16,9 +19,8 @@ import (
// mocknet implements mocknet.Mocknet // mocknet implements mocknet.Mocknet
type mocknet struct { type mocknet struct {
// must map on peer.ID (instead of peer.ID) because nets map[peer.ID]*peernet
// each inet.Network has different peerstore hosts map[peer.ID]*bhost.BasicHost
nets map[peer.ID]*peernet
// links make it possible to connect two peers. // links make it possible to connect two peers.
// think of links as the physical medium. // think of links as the physical medium.
...@@ -35,33 +37,36 @@ type mocknet struct { ...@@ -35,33 +37,36 @@ type mocknet struct {
func New(ctx context.Context) Mocknet { func New(ctx context.Context) Mocknet {
return &mocknet{ return &mocknet{
nets: map[peer.ID]*peernet{}, nets: map[peer.ID]*peernet{},
hosts: map[peer.ID]*bhost.BasicHost{},
links: map[peer.ID]map[peer.ID]map[*link]struct{}{}, links: map[peer.ID]map[peer.ID]map[*link]struct{}{},
cg: ctxgroup.WithContext(ctx), cg: ctxgroup.WithContext(ctx),
} }
} }
func (mn *mocknet) GenPeer() (inet.Network, error) { func (mn *mocknet) GenPeer() (host.Host, error) {
sk, _, err := testutil.SeededKeyPair(int64(len(mn.nets))) sk, _, err := testutil.SeededKeyPair(time.Now().UnixNano())
if err != nil { if err != nil {
return nil, err return nil, err
} }
a := testutil.RandLocalTCPAddress() a := testutil.RandLocalTCPAddress()
n, err := mn.AddPeer(sk, a) h, err := mn.AddPeer(sk, a)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return n, nil return h, nil
} }
func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (inet.Network, error) { func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
n, err := newPeernet(mn.cg.Context(), mn, k, a) n, err := newPeernet(mn.cg.Context(), mn, k, a)
if err != nil { if err != nil {
return nil, err return nil, err
} }
h := bhost.New(n)
// make sure to add listening address! // make sure to add listening address!
// this makes debugging things simpler as remembering to register // this makes debugging things simpler as remembering to register
// an address may cause unexpected failure. // an address may cause unexpected failure.
...@@ -72,8 +77,9 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (inet.Network, error) { ...@@ -72,8 +77,9 @@ func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (inet.Network, error) {
mn.Lock() mn.Lock()
mn.nets[n.peer] = n mn.nets[n.peer] = n
mn.hosts[n.peer] = h
mn.Unlock() mn.Unlock()
return n, nil return h, nil
} }
func (mn *mocknet) Peers() []peer.ID { func (mn *mocknet) Peers() []peer.ID {
...@@ -87,16 +93,29 @@ func (mn *mocknet) Peers() []peer.ID { ...@@ -87,16 +93,29 @@ func (mn *mocknet) Peers() []peer.ID {
return cp return cp
} }
func (mn *mocknet) Host(pid peer.ID) host.Host {
mn.RLock()
host := mn.hosts[pid]
mn.RUnlock()
return host
}
func (mn *mocknet) Net(pid peer.ID) inet.Network { func (mn *mocknet) Net(pid peer.ID) inet.Network {
mn.RLock()
n := mn.nets[pid]
mn.RUnlock()
return n
}
func (mn *mocknet) Hosts() []host.Host {
mn.RLock() mn.RLock()
defer mn.RUnlock() defer mn.RUnlock()
for _, n := range mn.nets { cp := make([]host.Host, 0, len(mn.hosts))
if n.peer == pid { for _, h := range mn.hosts {
return n cp = append(cp, h)
}
} }
return nil return cp
} }
func (mn *mocknet) Nets() []inet.Network { func (mn *mocknet) Nets() []inet.Network {
...@@ -269,7 +288,7 @@ func (mn *mocknet) ConnectAll() error { ...@@ -269,7 +288,7 @@ func (mn *mocknet) ConnectAll() error {
continue continue
} }
if err := mn.ConnectNets(n1, n2); err != nil { if _, err := mn.ConnectNets(n1, n2); err != nil {
return err return err
} }
} }
...@@ -277,11 +296,11 @@ func (mn *mocknet) ConnectAll() error { ...@@ -277,11 +296,11 @@ func (mn *mocknet) ConnectAll() error {
return nil return nil
} }
func (mn *mocknet) ConnectPeers(a, b peer.ID) error { func (mn *mocknet) ConnectPeers(a, b peer.ID) (inet.Conn, error) {
return mn.Net(a).DialPeer(mn.cg.Context(), b) return mn.Net(a).DialPeer(mn.cg.Context(), b)
} }
func (mn *mocknet) ConnectNets(a, b inet.Network) error { func (mn *mocknet) ConnectNets(a, b inet.Network) (inet.Conn, error) {
return a.DialPeer(mn.cg.Context(), b.LocalPeer()) return a.DialPeer(mn.cg.Context(), b.LocalPeer())
} }
......
...@@ -7,9 +7,6 @@ import ( ...@@ -7,9 +7,6 @@ import (
ic "github.com/jbenet/go-ipfs/p2p/crypto" ic "github.com/jbenet/go-ipfs/p2p/crypto"
inet "github.com/jbenet/go-ipfs/p2p/net" inet "github.com/jbenet/go-ipfs/p2p/net"
ids "github.com/jbenet/go-ipfs/p2p/net/services/identify"
mux "github.com/jbenet/go-ipfs/p2p/net/services/mux"
relay "github.com/jbenet/go-ipfs/p2p/net/services/relay"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
...@@ -30,10 +27,9 @@ type peernet struct { ...@@ -30,10 +27,9 @@ type peernet struct {
connsByPeer map[peer.ID]map[*conn]struct{} connsByPeer map[peer.ID]map[*conn]struct{}
connsByLink map[*link]map[*conn]struct{} connsByLink map[*link]map[*conn]struct{}
// needed to implement inet.Network // implement inet.Network
mux mux.Mux streamHandler inet.StreamHandler
ids *ids.IDService connHandler inet.ConnHandler
relay *relay.RelayService
cg ctxgroup.ContextGroup cg ctxgroup.ContextGroup
sync.RWMutex sync.RWMutex
...@@ -58,7 +54,6 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, ...@@ -58,7 +54,6 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
mocknet: m, mocknet: m,
peer: p, peer: p,
ps: ps, ps: ps,
mux: mux.Mux{Handlers: inet.StreamHandlerMap{}},
cg: ctxgroup.WithContext(ctx), cg: ctxgroup.WithContext(ctx),
connsByPeer: map[peer.ID]map[*conn]struct{}{}, connsByPeer: map[peer.ID]map[*conn]struct{}{},
...@@ -66,14 +61,6 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey, ...@@ -66,14 +61,6 @@ func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
} }
n.cg.SetTeardown(n.teardown) n.cg.SetTeardown(n.teardown)
// setup a conn handler that immediately "asks the other side about them"
// this is ProtocolIdentify.
n.ids = ids.NewIDService(n)
// setup ProtocolRelay to allow traffic relaying.
// Feed things we get for ourselves into the muxer.
n.relay = relay.NewRelayService(n.cg.Context(), n, n.mux.HandleSync)
return n, nil return n, nil
} }
...@@ -104,10 +91,6 @@ func (pn *peernet) Close() error { ...@@ -104,10 +91,6 @@ func (pn *peernet) Close() error {
return pn.cg.Close() return pn.cg.Close()
} }
func (pn *peernet) Protocols() []inet.ProtocolID {
return pn.mux.Protocols()
}
func (pn *peernet) Peerstore() peer.Peerstore { func (pn *peernet) Peerstore() peer.Peerstore {
return pn.ps return pn.ps
} }
...@@ -116,24 +99,41 @@ func (pn *peernet) String() string { ...@@ -116,24 +99,41 @@ func (pn *peernet) String() string {
return fmt.Sprintf("<mock.peernet %s - %d conns>", pn.peer, len(pn.allConns())) return fmt.Sprintf("<mock.peernet %s - %d conns>", pn.peer, len(pn.allConns()))
} }
// handleNewStream is an internal function to trigger the muxer handler // handleNewStream is an internal function to trigger the client's handler
func (pn *peernet) handleNewStream(s inet.Stream) { func (pn *peernet) handleNewStream(s inet.Stream) {
go pn.mux.Handle(s) pn.RLock()
handler := pn.streamHandler
pn.RUnlock()
if handler != nil {
go handler(s)
}
}
// handleNewConn is an internal function to trigger the client's handler
func (pn *peernet) handleNewConn(c inet.Conn) {
pn.RLock()
handler := pn.connHandler
pn.RUnlock()
if handler != nil {
go handler(c)
}
} }
// DialPeer attempts to establish a connection to a given peer. // DialPeer attempts to establish a connection to a given peer.
// Respects the context. // Respects the context.
func (pn *peernet) DialPeer(ctx context.Context, p peer.ID) error { func (pn *peernet) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
return pn.connect(p) return pn.connect(p)
} }
func (pn *peernet) connect(p peer.ID) error { func (pn *peernet) connect(p peer.ID) (*conn, error) {
// first, check if we already have live connections // first, check if we already have live connections
pn.RLock() pn.RLock()
cs, found := pn.connsByPeer[p] cs, found := pn.connsByPeer[p]
pn.RUnlock() pn.RUnlock()
if found && len(cs) > 0 { if found && len(cs) > 0 {
return nil for c := range cs {
return c, nil
}
} }
log.Debugf("%s (newly) dialing %s", pn.peer, p) log.Debugf("%s (newly) dialing %s", pn.peer, p)
...@@ -141,7 +141,7 @@ func (pn *peernet) connect(p peer.ID) error { ...@@ -141,7 +141,7 @@ func (pn *peernet) connect(p peer.ID) error {
// ok, must create a new connection. we need a link // ok, must create a new connection. we need a link
links := pn.mocknet.LinksBetweenPeers(pn.peer, p) links := pn.mocknet.LinksBetweenPeers(pn.peer, p)
if len(links) < 1 { if len(links) < 1 {
return fmt.Errorf("%s cannot connect to %s", pn.peer, p) return nil, fmt.Errorf("%s cannot connect to %s", pn.peer, p)
} }
// if many links found, how do we select? for now, randomly... // if many links found, how do we select? for now, randomly...
...@@ -151,8 +151,8 @@ func (pn *peernet) connect(p peer.ID) error { ...@@ -151,8 +151,8 @@ func (pn *peernet) connect(p peer.ID) error {
log.Debugf("%s dialing %s openingConn", pn.peer, p) log.Debugf("%s dialing %s openingConn", pn.peer, p)
// create a new connection with link // create a new connection with link
pn.openConn(p, l.(*link)) c := pn.openConn(p, l.(*link))
return nil return c, nil
} }
func (pn *peernet) openConn(r peer.ID, l *link) *conn { func (pn *peernet) openConn(r peer.ID, l *link) *conn {
...@@ -166,16 +166,15 @@ func (pn *peernet) openConn(r peer.ID, l *link) *conn { ...@@ -166,16 +166,15 @@ func (pn *peernet) openConn(r peer.ID, l *link) *conn {
func (pn *peernet) remoteOpenedConn(c *conn) { func (pn *peernet) remoteOpenedConn(c *conn) {
log.Debugf("%s accepting connection from %s", pn.LocalPeer(), c.RemotePeer()) log.Debugf("%s accepting connection from %s", pn.LocalPeer(), c.RemotePeer())
pn.addConn(c) pn.addConn(c)
pn.handleNewConn(c)
} }
// addConn constructs and adds a connection // addConn constructs and adds a connection
// to given remote peer over given link // to given remote peer over given link
func (pn *peernet) addConn(c *conn) { func (pn *peernet) addConn(c *conn) {
// run the Identify protocol/handshake.
pn.ids.IdentifyConn(c)
pn.Lock() pn.Lock()
defer pn.Unlock()
cs, found := pn.connsByPeer[c.RemotePeer()] cs, found := pn.connsByPeer[c.RemotePeer()]
if !found { if !found {
cs = map[*conn]struct{}{} cs = map[*conn]struct{}{}
...@@ -189,7 +188,6 @@ func (pn *peernet) addConn(c *conn) { ...@@ -189,7 +188,6 @@ func (pn *peernet) addConn(c *conn) {
pn.connsByLink[c.link] = cs pn.connsByLink[c.link] = cs
} }
pn.connsByLink[c.link][c] = struct{}{} pn.connsByLink[c.link][c] = struct{}{}
pn.Unlock()
} }
// removeConn removes a given conn // removeConn removes a given conn
...@@ -314,15 +312,14 @@ func (pn *peernet) Connectedness(p peer.ID) inet.Connectedness { ...@@ -314,15 +312,14 @@ func (pn *peernet) Connectedness(p peer.ID) inet.Connectedness {
// NewStream returns a new stream to given peer p. // NewStream returns a new stream to given peer p.
// If there is no connection to p, attempts to create one. // If there is no connection to p, attempts to create one.
// If ProtocolID is "", writes no header. func (pn *peernet) NewStream(p peer.ID) (inet.Stream, error) {
func (pn *peernet) NewStream(pr inet.ProtocolID, p peer.ID) (inet.Stream, error) {
pn.Lock() pn.Lock()
defer pn.Unlock()
cs, found := pn.connsByPeer[p] cs, found := pn.connsByPeer[p]
if !found || len(cs) < 1 { if !found || len(cs) < 1 {
pn.Unlock()
return nil, fmt.Errorf("no connection to peer") return nil, fmt.Errorf("no connection to peer")
} }
pn.Unlock()
// if many conns are found, how do we select? for now, randomly... // if many conns are found, how do we select? for now, randomly...
// this would be an interesting place to test logic that can measure // this would be an interesting place to test logic that can measure
...@@ -336,15 +333,21 @@ func (pn *peernet) NewStream(pr inet.ProtocolID, p peer.ID) (inet.Stream, error) ...@@ -336,15 +333,21 @@ func (pn *peernet) NewStream(pr inet.ProtocolID, p peer.ID) (inet.Stream, error)
n-- n--
} }
return c.NewStreamWithProtocol(pr) return c.NewStream()
} }
// SetHandler sets the protocol handler on the Network's Muxer. // SetStreamHandler sets the new stream handler on the Network.
// This operation is threadsafe. // This operation is threadsafe.
func (pn *peernet) SetHandler(p inet.ProtocolID, h inet.StreamHandler) { func (pn *peernet) SetStreamHandler(h inet.StreamHandler) {
pn.mux.SetHandler(p, h) pn.Lock()
pn.streamHandler = h
pn.Unlock()
} }
func (pn *peernet) IdentifyProtocol() *ids.IDService { // SetConnHandler sets the new conn handler on the Network.
return pn.ids // This operation is threadsafe.
func (pn *peernet) SetConnHandler(h inet.ConnHandler) {
pn.Lock()
pn.connHandler = h
pn.Unlock()
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment