Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
adam.huang
go-libp2p
Commits
0f3ffb2d
Commit
0f3ffb2d
authored
8 years ago
by
Jeromy
Browse files
Options
Download
Email Patches
Plain Diff
extract conn, addr-util, and testutil
parent
0aaec876
master
2018-Q4-OKR
docs-improvements
feat/p2p-multiaddr
feat/pnet/working3
feat/protobuf
feat/relay-integrate
feat/udp
feature/standardize-readme
fix/473
fix/no-custom-field
fix/reset-ping-stream
fix/revert-correct-external-addr
gx/update-jccl6u
gx/update-nza0mn
jenkinsfile
kevina/fix-go-vet
multistream-ping
punching
revert-276-update-go-detect-race
v6.0.23
v6.0.22
v6.0.21
v6.0.20
v6.0.19
v6.0.18
v6.0.17
v6.0.16
v6.0.15
v6.0.14
v6.0.13
v6.0.12
v6.0.11
v6.0.10
v6.0.9
v6.0.8
v6.0.7
v6.0.6
v6.0.5
v6.0.4
v6.0.3
v6.0.2
v6.0.1
v6.0.0
v5.0.21
v5.0.20
v5.0.19
v5.0.18
v5.0.17
v5.0.16
v5.0.15
v5.0.14
v5.0.13
v5.0.12
v5.0.11
v5.0.10
v5.0.9
v5.0.8
v5.0.7
v5.0.6
v5.0.5
v5.0.4
v5.0.3
v5.0.2
v5.0.1
v5.0.0
v4.5.5
v4.5.4
v4.5.3
v4.5.2
v4.5.1
v4.5.0
v4.4.5
v4.4.4
v4.4.3
v4.4.2
v4.4.1
v4.4.0
v4.3.12
v4.3.11
v4.3.10
v4.3.9
v4.3.8
v4.3.7
v4.3.6
v4.3.5
v4.3.4
v4.3.3
v4.3.2
v4.3.1
v4.3.0
v4.2.0
v4.1.0
v4.0.4
v4.0.3
v4.0.2
v4.0.1
v4.0.0
No related merge requests found
Changes
38
Hide whitespace changes
Inline
Side-by-side
Showing
20 changed files
examples/hosts/main.go
+3
-3
examples/hosts/main.go
p2p/net/conn/conn.go
+0
-145
p2p/net/conn/conn.go
p2p/net/conn/conn_test.go
+0
-138
p2p/net/conn/conn_test.go
p2p/net/conn/dial.go
+0
-219
p2p/net/conn/dial.go
p2p/net/conn/dial_test.go
+0
-649
p2p/net/conn/dial_test.go
p2p/net/conn/interface.go
+0
-99
p2p/net/conn/interface.go
p2p/net/conn/listen.go
+0
-235
p2p/net/conn/listen.go
p2p/net/conn/secure_conn.go
+0
-124
p2p/net/conn/secure_conn.go
p2p/net/conn/secure_conn_test.go
+0
-211
p2p/net/conn/secure_conn_test.go
p2p/net/interface.go
+1
-1
p2p/net/interface.go
p2p/net/mock/mock_net.go
+2
-2
p2p/net/mock/mock_net.go
p2p/net/mock/mock_test.go
+4
-4
p2p/net/mock/mock_test.go
p2p/net/swarm/addr/addr.go
+0
-284
p2p/net/swarm/addr/addr.go
p2p/net/swarm/addr/addr_test.go
+0
-227
p2p/net/swarm/addr/addr_test.go
p2p/net/swarm/addr/filter.go
+0
-31
p2p/net/swarm/addr/filter.go
p2p/net/swarm/dial_test.go
+4
-5
p2p/net/swarm/dial_test.go
p2p/net/swarm/limiter.go
+2
-3
p2p/net/swarm/limiter.go
p2p/net/swarm/limiter_test.go
+1
-2
p2p/net/swarm/limiter_test.go
p2p/net/swarm/simul_test.go
+2
-3
p2p/net/swarm/simul_test.go
p2p/net/swarm/swarm.go
+2
-2
p2p/net/swarm/swarm.go
with
21 additions
and
2387 deletions
+21
-2387
examples/hosts/main.go
View file @
0f3ffb2d
...
...
@@ -8,16 +8,16 @@ import (
"log"
"strings"
peer
"github.com/ipfs/go-libp2p-peer"
pstore
"github.com/ipfs/go-libp2p-peerstore"
host
"github.com/libp2p/go-libp2p/p2p/host"
bhost
"github.com/libp2p/go-libp2p/p2p/host/basic"
inet
"github.com/libp2p/go-libp2p/p2p/net"
net
"github.com/libp2p/go-libp2p/p2p/net"
swarm
"github.com/libp2p/go-libp2p/p2p/net/swarm"
testutil
"github.com/libp2p/go-libp2p/testutil"
peer
"github.com/ipfs/go-libp2p-peer"
pstore
"github.com/ipfs/go-libp2p-peerstore"
ma
"github.com/jbenet/go-multiaddr"
testutil
"github.com/libp2p/go-testutil"
)
// create a 'Host' with a random peer to listen on the given address
...
...
This diff is collapsed.
Click to expand it.
p2p/net/conn/conn.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"context"
"fmt"
"io"
"net"
"time"
u
"github.com/ipfs/go-ipfs-util"
ic
"github.com/ipfs/go-libp2p-crypto"
lgbl
"github.com/ipfs/go-libp2p-loggables"
peer
"github.com/ipfs/go-libp2p-peer"
logging
"github.com/ipfs/go-log"
mpool
"github.com/jbenet/go-msgio/mpool"
ma
"github.com/jbenet/go-multiaddr"
manet
"github.com/jbenet/go-multiaddr-net"
)
var
log
=
logging
.
Logger
(
"conn"
)
// ReleaseBuffer puts the given byte array back into the buffer pool,
// first verifying that it is the correct size
func
ReleaseBuffer
(
b
[]
byte
)
{
log
.
Debugf
(
"Releasing buffer! (cap,size = %d, %d)"
,
cap
(
b
),
len
(
b
))
mpool
.
ByteSlicePool
.
Put
(
uint32
(
cap
(
b
)),
b
)
}
// singleConn represents a single connection to another Peer (IPFS Node).
type
singleConn
struct
{
local
peer
.
ID
remote
peer
.
ID
maconn
manet
.
Conn
event
io
.
Closer
}
// newConn constructs a new connection
func
newSingleConn
(
ctx
context
.
Context
,
local
,
remote
peer
.
ID
,
maconn
manet
.
Conn
)
(
Conn
,
error
)
{
ml
:=
lgbl
.
Dial
(
"conn"
,
local
,
remote
,
maconn
.
LocalMultiaddr
(),
maconn
.
RemoteMultiaddr
())
conn
:=
&
singleConn
{
local
:
local
,
remote
:
remote
,
maconn
:
maconn
,
event
:
log
.
EventBegin
(
ctx
,
"connLifetime"
,
ml
),
}
log
.
Debugf
(
"newSingleConn %p: %v to %v"
,
conn
,
local
,
remote
)
return
conn
,
nil
}
// close is the internal close function, called by ContextCloser.Close
func
(
c
*
singleConn
)
Close
()
error
{
defer
func
()
{
if
c
.
event
!=
nil
{
c
.
event
.
Close
()
c
.
event
=
nil
}
}()
// close underlying connection
return
c
.
maconn
.
Close
()
}
// ID is an identifier unique to this connection.
func
(
c
*
singleConn
)
ID
()
string
{
return
ID
(
c
)
}
func
(
c
*
singleConn
)
String
()
string
{
return
String
(
c
,
"singleConn"
)
}
func
(
c
*
singleConn
)
LocalAddr
()
net
.
Addr
{
return
c
.
maconn
.
LocalAddr
()
}
func
(
c
*
singleConn
)
RemoteAddr
()
net
.
Addr
{
return
c
.
maconn
.
RemoteAddr
()
}
func
(
c
*
singleConn
)
LocalPrivateKey
()
ic
.
PrivKey
{
return
nil
}
func
(
c
*
singleConn
)
RemotePublicKey
()
ic
.
PubKey
{
return
nil
}
func
(
c
*
singleConn
)
SetDeadline
(
t
time
.
Time
)
error
{
return
c
.
maconn
.
SetDeadline
(
t
)
}
func
(
c
*
singleConn
)
SetReadDeadline
(
t
time
.
Time
)
error
{
return
c
.
maconn
.
SetReadDeadline
(
t
)
}
func
(
c
*
singleConn
)
SetWriteDeadline
(
t
time
.
Time
)
error
{
return
c
.
maconn
.
SetWriteDeadline
(
t
)
}
// LocalMultiaddr is the Multiaddr on this side
func
(
c
*
singleConn
)
LocalMultiaddr
()
ma
.
Multiaddr
{
return
c
.
maconn
.
LocalMultiaddr
()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func
(
c
*
singleConn
)
RemoteMultiaddr
()
ma
.
Multiaddr
{
return
c
.
maconn
.
RemoteMultiaddr
()
}
// LocalPeer is the Peer on this side
func
(
c
*
singleConn
)
LocalPeer
()
peer
.
ID
{
return
c
.
local
}
// RemotePeer is the Peer on the remote side
func
(
c
*
singleConn
)
RemotePeer
()
peer
.
ID
{
return
c
.
remote
}
// Read reads data, net.Conn style
func
(
c
*
singleConn
)
Read
(
buf
[]
byte
)
(
int
,
error
)
{
return
c
.
maconn
.
Read
(
buf
)
}
// Write writes data, net.Conn style
func
(
c
*
singleConn
)
Write
(
buf
[]
byte
)
(
int
,
error
)
{
return
c
.
maconn
.
Write
(
buf
)
}
// ID returns the ID of a given Conn.
func
ID
(
c
Conn
)
string
{
l
:=
fmt
.
Sprintf
(
"%s/%s"
,
c
.
LocalMultiaddr
(),
c
.
LocalPeer
()
.
Pretty
())
r
:=
fmt
.
Sprintf
(
"%s/%s"
,
c
.
RemoteMultiaddr
(),
c
.
RemotePeer
()
.
Pretty
())
lh
:=
u
.
Hash
([]
byte
(
l
))
rh
:=
u
.
Hash
([]
byte
(
r
))
ch
:=
u
.
XOR
(
lh
,
rh
)
return
peer
.
ID
(
ch
)
.
Pretty
()
}
// String returns the user-friendly String representation of a conn
func
String
(
c
Conn
,
typ
string
)
string
{
return
fmt
.
Sprintf
(
"%s (%s) <-- %s %p --> (%s) %s"
,
c
.
LocalPeer
(),
c
.
LocalMultiaddr
(),
typ
,
c
,
c
.
RemoteMultiaddr
(),
c
.
RemotePeer
())
}
This diff is collapsed.
Click to expand it.
p2p/net/conn/conn_test.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"bytes"
"fmt"
"runtime"
"sync"
"testing"
"time"
"context"
msgio
"github.com/jbenet/go-msgio"
travis
"github.com/libp2p/go-libp2p/testutil/ci/travis"
)
func
msgioWrap
(
c
Conn
)
msgio
.
ReadWriter
{
return
msgio
.
NewReadWriter
(
c
)
}
func
testOneSendRecv
(
t
*
testing
.
T
,
c1
,
c2
Conn
)
{
mc1
:=
msgioWrap
(
c1
)
mc2
:=
msgioWrap
(
c2
)
log
.
Debugf
(
"testOneSendRecv from %s to %s"
,
c1
.
LocalPeer
(),
c2
.
LocalPeer
())
m1
:=
[]
byte
(
"hello"
)
if
err
:=
mc1
.
WriteMsg
(
m1
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
m2
,
err
:=
mc2
.
ReadMsg
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
!
bytes
.
Equal
(
m1
,
m2
)
{
t
.
Fatal
(
"failed to send: %s %s"
,
m1
,
m2
)
}
}
func
testNotOneSendRecv
(
t
*
testing
.
T
,
c1
,
c2
Conn
)
{
mc1
:=
msgioWrap
(
c1
)
mc2
:=
msgioWrap
(
c2
)
m1
:=
[]
byte
(
"hello"
)
if
err
:=
mc1
.
WriteMsg
(
m1
);
err
==
nil
{
t
.
Fatal
(
"write should have failed"
,
err
)
}
_
,
err
:=
mc2
.
ReadMsg
()
if
err
==
nil
{
t
.
Fatal
(
"read should have failed"
,
err
)
}
}
func
TestClose
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
c1
,
c2
,
_
,
_
:=
setupSingleConn
(
t
,
ctx
)
testOneSendRecv
(
t
,
c1
,
c2
)
testOneSendRecv
(
t
,
c2
,
c1
)
c1
.
Close
()
testNotOneSendRecv
(
t
,
c1
,
c2
)
c2
.
Close
()
testNotOneSendRecv
(
t
,
c2
,
c1
)
testNotOneSendRecv
(
t
,
c1
,
c2
)
}
func
TestCloseLeak
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
if
testing
.
Short
()
{
t
.
SkipNow
()
}
if
travis
.
IsRunning
()
{
t
.
Skip
(
"this doesn't work well on travis"
)
}
var
wg
sync
.
WaitGroup
runPair
:=
func
(
num
int
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
c1
,
c2
,
_
,
_
:=
setupSingleConn
(
t
,
ctx
)
mc1
:=
msgioWrap
(
c1
)
mc2
:=
msgioWrap
(
c2
)
for
i
:=
0
;
i
<
num
;
i
++
{
b1
:=
[]
byte
(
fmt
.
Sprintf
(
"beep%d"
,
i
))
mc1
.
WriteMsg
(
b1
)
b2
,
err
:=
mc2
.
ReadMsg
()
if
err
!=
nil
{
panic
(
err
)
}
if
!
bytes
.
Equal
(
b1
,
b2
)
{
panic
(
fmt
.
Errorf
(
"bytes not equal: %s != %s"
,
b1
,
b2
))
}
b2
=
[]
byte
(
fmt
.
Sprintf
(
"boop%d"
,
i
))
mc2
.
WriteMsg
(
b2
)
b1
,
err
=
mc1
.
ReadMsg
()
if
err
!=
nil
{
panic
(
err
)
}
if
!
bytes
.
Equal
(
b1
,
b2
)
{
panic
(
fmt
.
Errorf
(
"bytes not equal: %s != %s"
,
b1
,
b2
))
}
<-
time
.
After
(
time
.
Microsecond
*
5
)
}
c1
.
Close
()
c2
.
Close
()
cancel
()
// close the listener
wg
.
Done
()
}
var
cons
=
5
var
msgs
=
50
log
.
Debugf
(
"Running %d connections * %d msgs.
\n
"
,
cons
,
msgs
)
for
i
:=
0
;
i
<
cons
;
i
++
{
wg
.
Add
(
1
)
go
runPair
(
msgs
)
}
log
.
Debugf
(
"Waiting...
\n
"
)
wg
.
Wait
()
// done!
time
.
Sleep
(
time
.
Millisecond
*
150
)
ngr
:=
runtime
.
NumGoroutine
()
if
ngr
>
25
{
// note, this is really innacurate
//panic("uncomment me to debug")
t
.
Fatal
(
"leaking goroutines:"
,
ngr
)
}
}
This diff is collapsed.
Click to expand it.
p2p/net/conn/dial.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"fmt"
"math/rand"
"strings"
"time"
"context"
ci
"github.com/ipfs/go-libp2p-crypto"
lgbl
"github.com/ipfs/go-libp2p-loggables"
peer
"github.com/ipfs/go-libp2p-peer"
ma
"github.com/jbenet/go-multiaddr"
manet
"github.com/jbenet/go-multiaddr-net"
transport
"github.com/libp2p/go-libp2p-transport"
addrutil
"github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
msmux
"github.com/whyrusleeping/go-multistream"
)
type
WrapFunc
func
(
transport
.
Conn
)
transport
.
Conn
func
NewDialer
(
p
peer
.
ID
,
pk
ci
.
PrivKey
,
wrap
WrapFunc
)
*
Dialer
{
return
&
Dialer
{
LocalPeer
:
p
,
PrivateKey
:
pk
,
Wrapper
:
wrap
,
fallback
:
new
(
transport
.
FallbackDialer
),
}
}
// String returns the string rep of d.
func
(
d
*
Dialer
)
String
()
string
{
return
fmt
.
Sprintf
(
"<Dialer %s ...>"
,
d
.
LocalPeer
)
}
// Dial connects to a peer over a particular address
// Ensures raddr is part of peer.Addresses()
// Example: d.DialAddr(ctx, peer.Addresses()[0], peer)
func
(
d
*
Dialer
)
Dial
(
ctx
context
.
Context
,
raddr
ma
.
Multiaddr
,
remote
peer
.
ID
)
(
Conn
,
error
)
{
logdial
:=
lgbl
.
Dial
(
"conn"
,
d
.
LocalPeer
,
remote
,
nil
,
raddr
)
logdial
[
"encrypted"
]
=
(
d
.
PrivateKey
!=
nil
)
// log wether this will be an encrypted dial or not.
defer
log
.
EventBegin
(
ctx
,
"connDial"
,
logdial
)
.
Done
()
var
connOut
Conn
var
errOut
error
done
:=
make
(
chan
struct
{})
// do it async to ensure we respect don contexteone
go
func
()
{
defer
func
()
{
select
{
case
done
<-
struct
{}{}
:
case
<-
ctx
.
Done
()
:
}
}()
maconn
,
err
:=
d
.
rawConnDial
(
ctx
,
raddr
,
remote
)
if
err
!=
nil
{
errOut
=
err
return
}
if
d
.
Wrapper
!=
nil
{
maconn
=
d
.
Wrapper
(
maconn
)
}
cryptoProtoChoice
:=
SecioTag
if
!
EncryptConnections
||
d
.
PrivateKey
==
nil
{
cryptoProtoChoice
=
NoEncryptionTag
}
maconn
.
SetReadDeadline
(
time
.
Now
()
.
Add
(
NegotiateReadTimeout
))
err
=
msmux
.
SelectProtoOrFail
(
cryptoProtoChoice
,
maconn
)
if
err
!=
nil
{
errOut
=
err
return
}
maconn
.
SetReadDeadline
(
time
.
Time
{})
c
,
err
:=
newSingleConn
(
ctx
,
d
.
LocalPeer
,
remote
,
maconn
)
if
err
!=
nil
{
maconn
.
Close
()
errOut
=
err
return
}
if
d
.
PrivateKey
==
nil
||
!
EncryptConnections
{
log
.
Warning
(
"dialer %s dialing INSECURELY %s at %s!"
,
d
,
remote
,
raddr
)
connOut
=
c
return
}
c2
,
err
:=
newSecureConn
(
ctx
,
d
.
PrivateKey
,
c
)
if
err
!=
nil
{
errOut
=
err
c
.
Close
()
return
}
connOut
=
c2
}()
select
{
case
<-
ctx
.
Done
()
:
logdial
[
"error"
]
=
ctx
.
Err
()
logdial
[
"dial"
]
=
"failure"
return
nil
,
ctx
.
Err
()
case
<-
done
:
// whew, finished.
}
if
errOut
!=
nil
{
logdial
[
"error"
]
=
errOut
logdial
[
"dial"
]
=
"failure"
return
nil
,
errOut
}
logdial
[
"dial"
]
=
"success"
return
connOut
,
nil
}
func
(
d
*
Dialer
)
AddDialer
(
pd
transport
.
Dialer
)
{
d
.
Dialers
=
append
(
d
.
Dialers
,
pd
)
}
// returns dialer that can dial the given address
func
(
d
*
Dialer
)
subDialerForAddr
(
raddr
ma
.
Multiaddr
)
transport
.
Dialer
{
for
_
,
pd
:=
range
d
.
Dialers
{
if
pd
.
Matches
(
raddr
)
{
return
pd
}
}
if
d
.
fallback
.
Matches
(
raddr
)
{
return
d
.
fallback
}
return
nil
}
// rawConnDial dials the underlying net.Conn + manet.Conns
func
(
d
*
Dialer
)
rawConnDial
(
ctx
context
.
Context
,
raddr
ma
.
Multiaddr
,
remote
peer
.
ID
)
(
transport
.
Conn
,
error
)
{
if
strings
.
HasPrefix
(
raddr
.
String
(),
"/ip4/0.0.0.0"
)
{
log
.
Event
(
ctx
,
"connDialZeroAddr"
,
lgbl
.
Dial
(
"conn"
,
d
.
LocalPeer
,
remote
,
nil
,
raddr
))
return
nil
,
fmt
.
Errorf
(
"Attempted to connect to zero address: %s"
,
raddr
)
}
sd
:=
d
.
subDialerForAddr
(
raddr
)
if
sd
==
nil
{
return
nil
,
fmt
.
Errorf
(
"no dialer for %s"
,
raddr
)
}
return
sd
.
DialContext
(
ctx
,
raddr
)
}
func
pickLocalAddr
(
laddrs
[]
ma
.
Multiaddr
,
raddr
ma
.
Multiaddr
)
(
laddr
ma
.
Multiaddr
)
{
if
len
(
laddrs
)
<
1
{
return
nil
}
// make sure that we ONLY use local addrs that match the remote addr.
laddrs
=
manet
.
AddrMatch
(
raddr
,
laddrs
)
if
len
(
laddrs
)
<
1
{
return
nil
}
// make sure that we ONLY use local addrs that CAN dial the remote addr.
// filter out all the local addrs that aren't capable
raddrIPLayer
:=
ma
.
Split
(
raddr
)[
0
]
raddrIsLoopback
:=
manet
.
IsIPLoopback
(
raddrIPLayer
)
raddrIsLinkLocal
:=
manet
.
IsIP6LinkLocal
(
raddrIPLayer
)
laddrs
=
addrutil
.
FilterAddrs
(
laddrs
,
func
(
a
ma
.
Multiaddr
)
bool
{
laddrIPLayer
:=
ma
.
Split
(
a
)[
0
]
laddrIsLoopback
:=
manet
.
IsIPLoopback
(
laddrIPLayer
)
laddrIsLinkLocal
:=
manet
.
IsIP6LinkLocal
(
laddrIPLayer
)
if
laddrIsLoopback
{
// our loopback addrs can only dial loopbacks.
return
raddrIsLoopback
}
if
laddrIsLinkLocal
{
return
raddrIsLinkLocal
// out linklocal addrs can only dial link locals.
}
return
true
})
// TODO pick with a good heuristic
// we use a random one for now to prevent bad addresses from making nodes unreachable
// with a random selection, multiple tries may work.
return
laddrs
[
rand
.
Intn
(
len
(
laddrs
))]
}
// MultiaddrProtocolsMatch returns whether two multiaddrs match in protocol stacks.
func
MultiaddrProtocolsMatch
(
a
,
b
ma
.
Multiaddr
)
bool
{
ap
:=
a
.
Protocols
()
bp
:=
b
.
Protocols
()
if
len
(
ap
)
!=
len
(
bp
)
{
return
false
}
for
i
,
api
:=
range
ap
{
if
api
.
Code
!=
bp
[
i
]
.
Code
{
return
false
}
}
return
true
}
// MultiaddrNetMatch returns the first Multiaddr found to match network.
func
MultiaddrNetMatch
(
tgt
ma
.
Multiaddr
,
srcs
[]
ma
.
Multiaddr
)
ma
.
Multiaddr
{
for
_
,
a
:=
range
srcs
{
if
MultiaddrProtocolsMatch
(
tgt
,
a
)
{
return
a
}
}
return
nil
}
This diff is collapsed.
Click to expand it.
p2p/net/conn/dial_test.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"bytes"
"fmt"
"io"
"net"
"runtime"
"strings"
"sync"
"testing"
"time"
ic
"github.com/ipfs/go-libp2p-crypto"
peer
"github.com/ipfs/go-libp2p-peer"
transport
"github.com/libp2p/go-libp2p-transport"
tu
"github.com/libp2p/go-libp2p/testutil"
tcpt
"github.com/libp2p/go-tcp-transport"
"context"
ma
"github.com/jbenet/go-multiaddr"
msmux
"github.com/whyrusleeping/go-multistream"
grc
"github.com/whyrusleeping/gorocheck"
)
func
goroFilter
(
r
*
grc
.
Goroutine
)
bool
{
return
strings
.
Contains
(
r
.
Function
,
"go-log."
)
||
strings
.
Contains
(
r
.
Stack
[
0
],
"testing.(*T).Run"
)
}
func
echoListen
(
ctx
context
.
Context
,
listener
Listener
)
{
for
{
c
,
err
:=
listener
.
Accept
()
if
err
!=
nil
{
select
{
case
<-
ctx
.
Done
()
:
return
default
:
}
if
ne
,
ok
:=
err
.
(
net
.
Error
);
ok
&&
ne
.
Temporary
()
{
<-
time
.
After
(
time
.
Microsecond
*
10
)
continue
}
log
.
Debugf
(
"echoListen: listener appears to be closing"
)
return
}
go
echo
(
c
.
(
Conn
))
}
}
func
echo
(
c
Conn
)
{
io
.
Copy
(
c
,
c
)
}
func
setupSecureConn
(
t
*
testing
.
T
,
ctx
context
.
Context
)
(
a
,
b
Conn
,
p1
,
p2
tu
.
PeerNetParams
)
{
return
setupConn
(
t
,
ctx
,
true
)
}
func
setupSingleConn
(
t
*
testing
.
T
,
ctx
context
.
Context
)
(
a
,
b
Conn
,
p1
,
p2
tu
.
PeerNetParams
)
{
return
setupConn
(
t
,
ctx
,
false
)
}
func
Listen
(
ctx
context
.
Context
,
addr
ma
.
Multiaddr
,
local
peer
.
ID
,
sk
ic
.
PrivKey
)
(
Listener
,
error
)
{
list
,
err
:=
tcpt
.
NewTCPTransport
()
.
Listen
(
addr
)
if
err
!=
nil
{
return
nil
,
err
}
return
WrapTransportListener
(
ctx
,
list
,
local
,
sk
)
}
func
dialer
(
t
*
testing
.
T
,
a
ma
.
Multiaddr
)
transport
.
Dialer
{
tpt
:=
tcpt
.
NewTCPTransport
()
tptd
,
err
:=
tpt
.
Dialer
(
a
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
return
tptd
}
func
setupConn
(
t
*
testing
.
T
,
ctx
context
.
Context
,
secure
bool
)
(
a
,
b
Conn
,
p1
,
p2
tu
.
PeerNetParams
)
{
p1
=
tu
.
RandPeerNetParamsOrFatal
(
t
)
p2
=
tu
.
RandPeerNetParamsOrFatal
(
t
)
key1
:=
p1
.
PrivKey
key2
:=
p2
.
PrivKey
if
!
secure
{
key1
=
nil
key2
=
nil
}
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
key1
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
d2
:=
&
Dialer
{
LocalPeer
:
p2
.
ID
,
PrivateKey
:
key2
,
}
d2
.
AddDialer
(
dialer
(
t
,
p2
.
Addr
))
var
c2
Conn
done
:=
make
(
chan
error
)
go
func
()
{
defer
close
(
done
)
var
err
error
c2
,
err
=
d2
.
Dial
(
ctx
,
p1
.
Addr
,
p1
.
ID
)
if
err
!=
nil
{
done
<-
err
return
}
// if secure, need to read + write, as that's what triggers the handshake.
if
secure
{
if
err
:=
sayHello
(
c2
);
err
!=
nil
{
done
<-
err
}
}
}()
c1
,
err
:=
l1
.
Accept
()
if
err
!=
nil
{
t
.
Fatal
(
"failed to accept"
,
err
)
}
// if secure, need to read + write, as that's what triggers the handshake.
if
secure
{
if
err
:=
sayHello
(
c1
);
err
!=
nil
{
done
<-
err
}
}
if
err
:=
<-
done
;
err
!=
nil
{
t
.
Fatal
(
err
)
}
return
c1
.
(
Conn
),
c2
,
p1
,
p2
}
func
sayHello
(
c
net
.
Conn
)
error
{
h
:=
[]
byte
(
"hello"
)
if
_
,
err
:=
c
.
Write
(
h
);
err
!=
nil
{
return
err
}
if
_
,
err
:=
c
.
Read
(
h
);
err
!=
nil
{
return
err
}
if
string
(
h
)
!=
"hello"
{
return
fmt
.
Errorf
(
"did not get hello"
)
}
return
nil
}
func
testDialer
(
t
*
testing
.
T
,
secure
bool
)
{
// t.Skip("Skipping in favor of another test")
p1
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
p2
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
key1
:=
p1
.
PrivKey
key2
:=
p2
.
PrivKey
if
!
secure
{
key1
=
nil
key2
=
nil
t
.
Log
(
"testing insecurely"
)
}
else
{
t
.
Log
(
"testing securely"
)
}
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
key1
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
d2
:=
&
Dialer
{
LocalPeer
:
p2
.
ID
,
PrivateKey
:
key2
,
}
d2
.
AddDialer
(
dialer
(
t
,
p2
.
Addr
))
go
echoListen
(
ctx
,
l1
)
c
,
err
:=
d2
.
Dial
(
ctx
,
p1
.
Addr
,
p1
.
ID
)
if
err
!=
nil
{
t
.
Fatal
(
"error dialing peer"
,
err
)
}
// fmt.Println("sending")
mc
:=
msgioWrap
(
c
)
mc
.
WriteMsg
([]
byte
(
"beep"
))
mc
.
WriteMsg
([]
byte
(
"boop"
))
out
,
err
:=
mc
.
ReadMsg
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
// fmt.Println("recving", string(out))
data
:=
string
(
out
)
if
data
!=
"beep"
{
t
.
Error
(
"unexpected conn output"
,
data
)
}
out
,
err
=
mc
.
ReadMsg
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
data
=
string
(
out
)
if
string
(
out
)
!=
"boop"
{
t
.
Error
(
"unexpected conn output"
,
data
)
}
// fmt.Println("closing")
c
.
Close
()
l1
.
Close
()
cancel
()
}
func
TestDialerInsecure
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
testDialer
(
t
,
false
)
}
func
TestDialerSecure
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
testDialer
(
t
,
true
)
}
func
testDialerCloseEarly
(
t
*
testing
.
T
,
secure
bool
)
{
// t.Skip("Skipping in favor of another test")
p1
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
p2
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
key1
:=
p1
.
PrivKey
if
!
secure
{
key1
=
nil
t
.
Log
(
"testing insecurely"
)
}
else
{
t
.
Log
(
"testing securely"
)
}
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
key1
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
// lol nesting
d2
:=
&
Dialer
{
LocalPeer
:
p2
.
ID
,
PrivateKey
:
p2
.
PrivKey
,
//-- dont give it key. we'll just close the conn.
}
d2
.
AddDialer
(
dialer
(
t
,
p2
.
Addr
))
errs
:=
make
(
chan
error
,
100
)
done
:=
make
(
chan
struct
{},
1
)
gotclosed
:=
make
(
chan
struct
{},
1
)
go
func
()
{
defer
func
()
{
done
<-
struct
{}{}
}()
c
,
err
:=
l1
.
Accept
()
if
err
!=
nil
{
if
strings
.
Contains
(
err
.
Error
(),
"closed"
)
{
gotclosed
<-
struct
{}{}
return
}
errs
<-
err
}
if
_
,
err
:=
c
.
Write
([]
byte
(
"hello"
));
err
!=
nil
{
gotclosed
<-
struct
{}{}
return
}
errs
<-
fmt
.
Errorf
(
"wrote to conn"
)
}()
c
,
err
:=
d2
.
Dial
(
ctx
,
p1
.
Addr
,
p1
.
ID
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
c
.
Close
()
// close it early.
readerrs
:=
func
()
{
for
{
select
{
case
e
:=
<-
errs
:
t
.
Error
(
e
)
default
:
return
}
}
}
readerrs
()
l1
.
Close
()
<-
done
cancel
()
readerrs
()
close
(
errs
)
select
{
case
<-
gotclosed
:
default
:
t
.
Error
(
"did not get closed"
)
}
}
// we dont do a handshake with singleConn, so cant "close early."
// func TestDialerCloseEarlyInsecure(t *testing.T) {
// // t.Skip("Skipping in favor of another test")
// testDialerCloseEarly(t, false)
// }
func
TestDialerCloseEarlySecure
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
testDialerCloseEarly
(
t
,
true
)
}
func
TestMultistreamHeader
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
p1
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
p1
.
PrivKey
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
go
func
()
{
_
,
_
=
l1
.
Accept
()
}()
con
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
defer
con
.
Close
()
err
=
msmux
.
SelectProtoOrFail
(
SecioTag
,
con
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
func
TestFailedAccept
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
p1
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
p1
.
PrivKey
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
done
:=
make
(
chan
struct
{})
go
func
()
{
defer
close
(
done
)
con
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
t
.
Error
(
"first dial failed: "
,
err
)
}
// write some garbage
con
.
Write
(
bytes
.
Repeat
([]
byte
{
255
},
1000
))
con
.
Close
()
con
,
err
=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
t
.
Error
(
"second dial failed: "
,
err
)
}
defer
con
.
Close
()
err
=
msmux
.
SelectProtoOrFail
(
SecioTag
,
con
)
if
err
!=
nil
{
t
.
Error
(
"msmux select failed: "
,
err
)
}
}()
c
,
err
:=
l1
.
Accept
()
if
err
!=
nil
{
t
.
Fatal
(
"connections after a failed accept should still work: "
,
err
)
}
c
.
Close
()
<-
done
}
func
TestHangingAccept
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
p1
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
p1
.
PrivKey
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
done
:=
make
(
chan
struct
{})
go
func
()
{
defer
close
(
done
)
con
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
t
.
Error
(
"first dial failed: "
,
err
)
}
// hang this connection
defer
con
.
Close
()
// ensure that the first conn hits first
time
.
Sleep
(
time
.
Millisecond
*
50
)
con2
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
t
.
Error
(
"second dial failed: "
,
err
)
}
defer
con2
.
Close
()
err
=
msmux
.
SelectProtoOrFail
(
SecioTag
,
con2
)
if
err
!=
nil
{
t
.
Error
(
"msmux select failed: "
,
err
)
}
_
,
err
=
con2
.
Write
([]
byte
(
"test"
))
if
err
!=
nil
{
t
.
Error
(
"con write failed: "
,
err
)
}
}()
c
,
err
:=
l1
.
Accept
()
if
err
!=
nil
{
t
.
Fatal
(
"connections after a failed accept should still work: "
,
err
)
}
c
.
Close
()
<-
done
}
// This test kicks off N (=300) concurrent dials, which wait d (=20ms) seconds before failing.
// That wait holds up the handshake (multistream AND crypto), which will happen BEFORE
// l1.Accept() returns a connection. This test checks that the handshakes all happen
// concurrently in the listener side, and not sequentially. This ensures that a hanging dial
// will not block the listener from accepting other dials concurrently.
func
TestConcurrentAccept
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
p1
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
p1
.
PrivKey
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
n
:=
300
delay
:=
time
.
Millisecond
*
20
if
runtime
.
GOOS
==
"darwin"
{
n
=
100
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
var
wg
sync
.
WaitGroup
for
i
:=
0
;
i
<
n
;
i
++
{
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
con
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
log
.
Error
(
err
)
t
.
Error
(
"first dial failed: "
,
err
)
return
}
// hang this connection
defer
con
.
Close
()
time
.
Sleep
(
delay
)
err
=
msmux
.
SelectProtoOrFail
(
SecioTag
,
con
)
if
err
!=
nil
{
t
.
Error
(
err
)
}
}()
}
before
:=
time
.
Now
()
for
i
:=
0
;
i
<
n
;
i
++
{
c
,
err
:=
l1
.
Accept
()
if
err
!=
nil
{
t
.
Fatal
(
"connections after a failed accept should still work: "
,
err
)
}
c
.
Close
()
}
limit
:=
delay
*
time
.
Duration
(
n
)
took
:=
time
.
Since
(
before
)
if
took
>
limit
{
t
.
Fatal
(
"took too long!"
)
}
log
.
Errorf
(
"took: %s (less than %s)"
,
took
,
limit
)
l1
.
Close
()
wg
.
Wait
()
cancel
()
time
.
Sleep
(
time
.
Millisecond
*
100
)
err
=
grc
.
CheckForLeaks
(
goroFilter
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
func
TestConnectionTimeouts
(
t
*
testing
.
T
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
old
:=
NegotiateReadTimeout
NegotiateReadTimeout
=
time
.
Second
*
5
defer
func
()
{
NegotiateReadTimeout
=
old
}()
p1
:=
tu
.
RandPeerNetParamsOrFatal
(
t
)
l1
,
err
:=
Listen
(
ctx
,
p1
.
Addr
,
p1
.
ID
,
p1
.
PrivKey
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
n
:=
100
if
runtime
.
GOOS
==
"darwin"
{
n
=
50
}
p1
.
Addr
=
l1
.
Multiaddr
()
// Addr has been determined by kernel.
var
wg
sync
.
WaitGroup
for
i
:=
0
;
i
<
n
;
i
++
{
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
con
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
log
.
Error
(
err
)
t
.
Error
(
"first dial failed: "
,
err
)
return
}
defer
con
.
Close
()
// hang this connection until timeout
io
.
ReadFull
(
con
,
make
([]
byte
,
1000
))
}()
}
// wait to make sure the hanging dials have started
time
.
Sleep
(
time
.
Millisecond
*
50
)
good_n
:=
20
for
i
:=
0
;
i
<
good_n
;
i
++
{
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
con
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
log
.
Error
(
err
)
t
.
Error
(
"first dial failed: "
,
err
)
return
}
defer
con
.
Close
()
// dial these ones through
err
=
msmux
.
SelectProtoOrFail
(
SecioTag
,
con
)
if
err
!=
nil
{
t
.
Error
(
err
)
}
}()
}
before
:=
time
.
Now
()
for
i
:=
0
;
i
<
good_n
;
i
++
{
c
,
err
:=
l1
.
Accept
()
if
err
!=
nil
{
t
.
Fatal
(
"connections during hung dials should still work: "
,
err
)
}
c
.
Close
()
}
took
:=
time
.
Since
(
before
)
if
took
>
time
.
Second
*
5
{
t
.
Fatal
(
"hanging dials shouldnt block good dials"
)
}
wg
.
Wait
()
go
func
()
{
con
,
err
:=
net
.
Dial
(
"tcp"
,
l1
.
Addr
()
.
String
())
if
err
!=
nil
{
log
.
Error
(
err
)
t
.
Error
(
"first dial failed: "
,
err
)
return
}
defer
con
.
Close
()
// dial these ones through
err
=
msmux
.
SelectProtoOrFail
(
SecioTag
,
con
)
if
err
!=
nil
{
t
.
Error
(
err
)
}
}()
// make sure we can dial in still after a bunch of timeouts
con
,
err
:=
l1
.
Accept
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
con
.
Close
()
l1
.
Close
()
cancel
()
time
.
Sleep
(
time
.
Millisecond
*
100
)
err
=
grc
.
CheckForLeaks
(
goroFilter
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
This diff is collapsed.
Click to expand it.
p2p/net/conn/interface.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"io"
"net"
"time"
ic
"github.com/ipfs/go-libp2p-crypto"
peer
"github.com/ipfs/go-libp2p-peer"
ma
"github.com/jbenet/go-multiaddr"
transport
"github.com/libp2p/go-libp2p-transport"
filter
"github.com/libp2p/go-maddr-filter"
)
type
PeerConn
interface
{
io
.
Closer
// LocalPeer (this side) ID, PrivateKey, and Address
LocalPeer
()
peer
.
ID
LocalPrivateKey
()
ic
.
PrivKey
LocalMultiaddr
()
ma
.
Multiaddr
// RemotePeer ID, PublicKey, and Address
RemotePeer
()
peer
.
ID
RemotePublicKey
()
ic
.
PubKey
RemoteMultiaddr
()
ma
.
Multiaddr
}
// Conn is a generic message-based Peer-to-Peer connection.
type
Conn
interface
{
PeerConn
// ID is an identifier unique to this connection.
ID
()
string
// can't just say "net.Conn" cause we have duplicate methods.
LocalAddr
()
net
.
Addr
RemoteAddr
()
net
.
Addr
SetDeadline
(
t
time
.
Time
)
error
SetReadDeadline
(
t
time
.
Time
)
error
SetWriteDeadline
(
t
time
.
Time
)
error
io
.
Reader
io
.
Writer
}
// Dialer is an object that can open connections. We could have a "convenience"
// Dial function as before, but it would have many arguments, as dialing is
// no longer simple (need a peerstore, a local peer, a context, a network, etc)
type
Dialer
struct
{
// LocalPeer is the identity of the local Peer.
LocalPeer
peer
.
ID
// LocalAddrs is a set of local addresses to use.
//LocalAddrs []ma.Multiaddr
// Dialers are the sub-dialers usable by this dialer
// selected in order based on the address being dialed
Dialers
[]
transport
.
Dialer
// PrivateKey used to initialize a secure connection.
// Warning: if PrivateKey is nil, connection will not be secured.
PrivateKey
ic
.
PrivKey
// Wrapper to wrap the raw connection (optional)
Wrapper
WrapFunc
fallback
transport
.
Dialer
}
// Listener is an object that can accept connections. It matches net.Listener
type
Listener
interface
{
// Accept waits for and returns the next connection to the listener.
Accept
()
(
net
.
Conn
,
error
)
// Addr is the local address
Addr
()
net
.
Addr
// Multiaddr is the local multiaddr address
Multiaddr
()
ma
.
Multiaddr
// LocalPeer is the identity of the local Peer.
LocalPeer
()
peer
.
ID
SetAddrFilters
(
*
filter
.
Filters
)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close
()
error
}
// EncryptConnections is a global parameter because it should either be
// enabled or _completely disabled_. I.e. a node should only be able to talk
// to proper (encrypted) networks if it is encrypting all its transports.
// Running a node with disabled transport encryption is useful to debug the
// protocols, achieve implementation interop, or for private networks which
// -- for whatever reason -- _must_ run unencrypted.
var
EncryptConnections
=
true
This diff is collapsed.
Click to expand it.
p2p/net/conn/listen.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"context"
"fmt"
"io"
"net"
"sync"
"time"
ic
"github.com/ipfs/go-libp2p-crypto"
peer
"github.com/ipfs/go-libp2p-peer"
ma
"github.com/jbenet/go-multiaddr"
tec
"github.com/jbenet/go-temp-err-catcher"
"github.com/jbenet/goprocess"
goprocessctx
"github.com/jbenet/goprocess/context"
transport
"github.com/libp2p/go-libp2p-transport"
filter
"github.com/libp2p/go-maddr-filter"
msmux
"github.com/whyrusleeping/go-multistream"
)
const
(
SecioTag
=
"/secio/1.0.0"
NoEncryptionTag
=
"/plaintext/1.0.0"
)
var
(
connAcceptBuffer
=
32
NegotiateReadTimeout
=
time
.
Second
*
60
)
// ConnWrapper is any function that wraps a raw multiaddr connection
type
ConnWrapper
func
(
transport
.
Conn
)
transport
.
Conn
// listener is an object that can accept connections. It implements Listener
type
listener
struct
{
transport
.
Listener
local
peer
.
ID
// LocalPeer is the identity of the local Peer
privk
ic
.
PrivKey
// private key to use to initialize secure conns
filters
*
filter
.
Filters
wrapper
ConnWrapper
catcher
tec
.
TempErrCatcher
proc
goprocess
.
Process
mux
*
msmux
.
MultistreamMuxer
incoming
chan
connErr
ctx
context
.
Context
}
func
(
l
*
listener
)
teardown
()
error
{
defer
log
.
Debugf
(
"listener closed: %s %s"
,
l
.
local
,
l
.
Multiaddr
())
return
l
.
Listener
.
Close
()
}
func
(
l
*
listener
)
Close
()
error
{
log
.
Debugf
(
"listener closing: %s %s"
,
l
.
local
,
l
.
Multiaddr
())
return
l
.
proc
.
Close
()
}
func
(
l
*
listener
)
String
()
string
{
return
fmt
.
Sprintf
(
"<Listener %s %s>"
,
l
.
local
,
l
.
Multiaddr
())
}
func
(
l
*
listener
)
SetAddrFilters
(
fs
*
filter
.
Filters
)
{
l
.
filters
=
fs
}
type
connErr
struct
{
conn
transport
.
Conn
err
error
}
// Accept waits for and returns the next connection to the listener.
// Note that unfortunately this
func
(
l
*
listener
)
Accept
()
(
net
.
Conn
,
error
)
{
for
con
:=
range
l
.
incoming
{
if
con
.
err
!=
nil
{
return
nil
,
con
.
err
}
c
,
err
:=
newSingleConn
(
l
.
ctx
,
l
.
local
,
""
,
con
.
conn
)
if
err
!=
nil
{
con
.
conn
.
Close
()
if
l
.
catcher
.
IsTemporary
(
err
)
{
continue
}
return
nil
,
err
}
if
l
.
privk
==
nil
||
!
EncryptConnections
{
log
.
Warning
(
"listener %s listening INSECURELY!"
,
l
)
return
c
,
nil
}
sc
,
err
:=
newSecureConn
(
l
.
ctx
,
l
.
privk
,
c
)
if
err
!=
nil
{
con
.
conn
.
Close
()
log
.
Infof
(
"ignoring conn we failed to secure: %s %s"
,
err
,
c
)
continue
}
return
sc
,
nil
}
return
nil
,
fmt
.
Errorf
(
"listener is closed"
)
}
func
(
l
*
listener
)
Addr
()
net
.
Addr
{
return
l
.
Listener
.
Addr
()
}
// Multiaddr is the identity of the local Peer.
// If there is an error converting from net.Addr to ma.Multiaddr,
// the return value will be nil.
func
(
l
*
listener
)
Multiaddr
()
ma
.
Multiaddr
{
return
l
.
Listener
.
Multiaddr
()
}
// LocalPeer is the identity of the local Peer.
func
(
l
*
listener
)
LocalPeer
()
peer
.
ID
{
return
l
.
local
}
func
(
l
*
listener
)
Loggable
()
map
[
string
]
interface
{}
{
return
map
[
string
]
interface
{}{
"listener"
:
map
[
string
]
interface
{}{
"peer"
:
l
.
LocalPeer
(),
"address"
:
l
.
Multiaddr
(),
"secure"
:
(
l
.
privk
!=
nil
),
},
}
}
func
(
l
*
listener
)
handleIncoming
()
{
var
wg
sync
.
WaitGroup
defer
func
()
{
wg
.
Wait
()
close
(
l
.
incoming
)
}()
wg
.
Add
(
1
)
defer
wg
.
Done
()
for
{
maconn
,
err
:=
l
.
Listener
.
Accept
()
if
err
!=
nil
{
if
l
.
catcher
.
IsTemporary
(
err
)
{
continue
}
l
.
incoming
<-
connErr
{
err
:
err
}
return
}
log
.
Debugf
(
"listener %s got connection: %s <---> %s"
,
l
,
maconn
.
LocalMultiaddr
(),
maconn
.
RemoteMultiaddr
())
if
l
.
filters
!=
nil
&&
l
.
filters
.
AddrBlocked
(
maconn
.
RemoteMultiaddr
())
{
log
.
Debugf
(
"blocked connection from %s"
,
maconn
.
RemoteMultiaddr
())
maconn
.
Close
()
continue
}
// If we have a wrapper func, wrap this conn
if
l
.
wrapper
!=
nil
{
maconn
=
l
.
wrapper
(
maconn
)
}
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
maconn
.
SetReadDeadline
(
time
.
Now
()
.
Add
(
NegotiateReadTimeout
))
_
,
_
,
err
=
l
.
mux
.
Negotiate
(
maconn
)
if
err
!=
nil
{
log
.
Info
(
"incoming conn: negotiation of crypto protocol failed: "
,
err
)
maconn
.
Close
()
return
}
// clear read readline
maconn
.
SetReadDeadline
(
time
.
Time
{})
l
.
incoming
<-
connErr
{
conn
:
maconn
}
}()
}
}
func
WrapTransportListener
(
ctx
context
.
Context
,
ml
transport
.
Listener
,
local
peer
.
ID
,
sk
ic
.
PrivKey
)
(
Listener
,
error
)
{
l
:=
&
listener
{
Listener
:
ml
,
local
:
local
,
privk
:
sk
,
mux
:
msmux
.
NewMultistreamMuxer
(),
incoming
:
make
(
chan
connErr
,
connAcceptBuffer
),
ctx
:
ctx
,
}
l
.
proc
=
goprocessctx
.
WithContextAndTeardown
(
ctx
,
l
.
teardown
)
l
.
catcher
.
IsTemp
=
func
(
e
error
)
bool
{
// ignore connection breakages up to this point. but log them
if
e
==
io
.
EOF
{
log
.
Debugf
(
"listener ignoring conn with EOF: %s"
,
e
)
return
true
}
te
,
ok
:=
e
.
(
tec
.
Temporary
)
if
ok
{
log
.
Debugf
(
"listener ignoring conn with temporary err: %s"
,
e
)
return
te
.
Temporary
()
}
return
false
}
if
EncryptConnections
&&
sk
!=
nil
{
l
.
mux
.
AddHandler
(
SecioTag
,
nil
)
}
else
{
l
.
mux
.
AddHandler
(
NoEncryptionTag
,
nil
)
}
go
l
.
handleIncoming
()
log
.
Debugf
(
"Conn Listener on %s"
,
l
.
Multiaddr
())
log
.
Event
(
ctx
,
"swarmListen"
,
l
)
return
l
,
nil
}
type
ListenerConnWrapper
interface
{
SetConnWrapper
(
ConnWrapper
)
}
// SetConnWrapper assigns a maconn ConnWrapper to wrap all incoming
// connections with. MUST be set _before_ calling `Accept()`
func
(
l
*
listener
)
SetConnWrapper
(
cw
ConnWrapper
)
{
l
.
wrapper
=
cw
}
This diff is collapsed.
Click to expand it.
p2p/net/conn/secure_conn.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"context"
"errors"
"net"
"time"
ic
"github.com/ipfs/go-libp2p-crypto"
peer
"github.com/ipfs/go-libp2p-peer"
secio
"github.com/ipfs/go-libp2p-secio"
ma
"github.com/jbenet/go-multiaddr"
)
// secureConn wraps another Conn object with an encrypted channel.
type
secureConn
struct
{
insecure
Conn
// the wrapped conn
secure
secio
.
Session
// secure Session
}
// newConn constructs a new connection
func
newSecureConn
(
ctx
context
.
Context
,
sk
ic
.
PrivKey
,
insecure
Conn
)
(
Conn
,
error
)
{
if
insecure
==
nil
{
return
nil
,
errors
.
New
(
"insecure is nil"
)
}
if
insecure
.
LocalPeer
()
==
""
{
return
nil
,
errors
.
New
(
"insecure.LocalPeer() is nil"
)
}
if
sk
==
nil
{
return
nil
,
errors
.
New
(
"private key is nil"
)
}
// NewSession performs the secure handshake, which takes multiple RTT
sessgen
:=
secio
.
SessionGenerator
{
LocalID
:
insecure
.
LocalPeer
(),
PrivateKey
:
sk
}
secure
,
err
:=
sessgen
.
NewSession
(
ctx
,
insecure
)
if
err
!=
nil
{
return
nil
,
err
}
conn
:=
&
secureConn
{
insecure
:
insecure
,
secure
:
secure
,
}
return
conn
,
nil
}
func
(
c
*
secureConn
)
Close
()
error
{
return
c
.
secure
.
Close
()
}
// ID is an identifier unique to this connection.
func
(
c
*
secureConn
)
ID
()
string
{
return
ID
(
c
)
}
func
(
c
*
secureConn
)
String
()
string
{
return
String
(
c
,
"secureConn"
)
}
func
(
c
*
secureConn
)
LocalAddr
()
net
.
Addr
{
return
c
.
insecure
.
LocalAddr
()
}
func
(
c
*
secureConn
)
RemoteAddr
()
net
.
Addr
{
return
c
.
insecure
.
RemoteAddr
()
}
func
(
c
*
secureConn
)
SetDeadline
(
t
time
.
Time
)
error
{
return
c
.
insecure
.
SetDeadline
(
t
)
}
func
(
c
*
secureConn
)
SetReadDeadline
(
t
time
.
Time
)
error
{
return
c
.
insecure
.
SetReadDeadline
(
t
)
}
func
(
c
*
secureConn
)
SetWriteDeadline
(
t
time
.
Time
)
error
{
return
c
.
insecure
.
SetWriteDeadline
(
t
)
}
// LocalMultiaddr is the Multiaddr on this side
func
(
c
*
secureConn
)
LocalMultiaddr
()
ma
.
Multiaddr
{
return
c
.
insecure
.
LocalMultiaddr
()
}
// RemoteMultiaddr is the Multiaddr on the remote side
func
(
c
*
secureConn
)
RemoteMultiaddr
()
ma
.
Multiaddr
{
return
c
.
insecure
.
RemoteMultiaddr
()
}
// LocalPeer is the Peer on this side
func
(
c
*
secureConn
)
LocalPeer
()
peer
.
ID
{
return
c
.
secure
.
LocalPeer
()
}
// RemotePeer is the Peer on the remote side
func
(
c
*
secureConn
)
RemotePeer
()
peer
.
ID
{
return
c
.
secure
.
RemotePeer
()
}
// LocalPrivateKey is the public key of the peer on this side
func
(
c
*
secureConn
)
LocalPrivateKey
()
ic
.
PrivKey
{
return
c
.
secure
.
LocalPrivateKey
()
}
// RemotePubKey is the public key of the peer on the remote side
func
(
c
*
secureConn
)
RemotePublicKey
()
ic
.
PubKey
{
return
c
.
secure
.
RemotePublicKey
()
}
// Read reads data, net.Conn style
func
(
c
*
secureConn
)
Read
(
buf
[]
byte
)
(
int
,
error
)
{
return
c
.
secure
.
ReadWriter
()
.
Read
(
buf
)
}
// Write writes data, net.Conn style
func
(
c
*
secureConn
)
Write
(
buf
[]
byte
)
(
int
,
error
)
{
return
c
.
secure
.
ReadWriter
()
.
Write
(
buf
)
}
// ReleaseMsg releases a buffer
func
(
c
*
secureConn
)
ReleaseMsg
(
m
[]
byte
)
{
c
.
secure
.
ReadWriter
()
.
ReleaseMsg
(
m
)
}
This diff is collapsed.
Click to expand it.
p2p/net/conn/secure_conn_test.go
deleted
100644 → 0
View file @
0aaec876
package
conn
import
(
"bytes"
"context"
"runtime"
"sync"
"testing"
"time"
ic
"github.com/ipfs/go-libp2p-crypto"
travis
"github.com/libp2p/go-libp2p/testutil/ci/travis"
)
func
upgradeToSecureConn
(
t
*
testing
.
T
,
ctx
context
.
Context
,
sk
ic
.
PrivKey
,
c
Conn
)
(
Conn
,
error
)
{
if
c
,
ok
:=
c
.
(
*
secureConn
);
ok
{
return
c
,
nil
}
// shouldn't happen, because dial + listen already return secure conns.
s
,
err
:=
newSecureConn
(
ctx
,
sk
,
c
)
if
err
!=
nil
{
return
nil
,
err
}
// need to read + write, as that's what triggers the handshake.
h
:=
[]
byte
(
"hello"
)
if
_
,
err
:=
s
.
Write
(
h
);
err
!=
nil
{
return
nil
,
err
}
if
_
,
err
:=
s
.
Read
(
h
);
err
!=
nil
{
return
nil
,
err
}
return
s
,
nil
}
func
secureHandshake
(
t
*
testing
.
T
,
ctx
context
.
Context
,
sk
ic
.
PrivKey
,
c
Conn
,
done
chan
error
)
{
_
,
err
:=
upgradeToSecureConn
(
t
,
ctx
,
sk
,
c
)
done
<-
err
}
func
TestSecureSimple
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
numMsgs
:=
100
if
testing
.
Short
()
{
numMsgs
=
10
}
ctx
:=
context
.
Background
()
c1
,
c2
,
p1
,
p2
:=
setupSingleConn
(
t
,
ctx
)
done
:=
make
(
chan
error
)
go
secureHandshake
(
t
,
ctx
,
p1
.
PrivKey
,
c1
,
done
)
go
secureHandshake
(
t
,
ctx
,
p2
.
PrivKey
,
c2
,
done
)
for
i
:=
0
;
i
<
2
;
i
++
{
if
err
:=
<-
done
;
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
for
i
:=
0
;
i
<
numMsgs
;
i
++
{
testOneSendRecv
(
t
,
c1
,
c2
)
testOneSendRecv
(
t
,
c2
,
c1
)
}
c1
.
Close
()
c2
.
Close
()
}
func
TestSecureClose
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
ctx
:=
context
.
Background
()
c1
,
c2
,
p1
,
p2
:=
setupSingleConn
(
t
,
ctx
)
done
:=
make
(
chan
error
)
go
secureHandshake
(
t
,
ctx
,
p1
.
PrivKey
,
c1
,
done
)
go
secureHandshake
(
t
,
ctx
,
p2
.
PrivKey
,
c2
,
done
)
for
i
:=
0
;
i
<
2
;
i
++
{
if
err
:=
<-
done
;
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
testOneSendRecv
(
t
,
c1
,
c2
)
c1
.
Close
()
testNotOneSendRecv
(
t
,
c1
,
c2
)
c2
.
Close
()
testNotOneSendRecv
(
t
,
c1
,
c2
)
testNotOneSendRecv
(
t
,
c2
,
c1
)
}
func
TestSecureCancelHandshake
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
c1
,
c2
,
p1
,
p2
:=
setupSingleConn
(
t
,
ctx
)
done
:=
make
(
chan
error
)
go
secureHandshake
(
t
,
ctx
,
p1
.
PrivKey
,
c1
,
done
)
time
.
Sleep
(
time
.
Millisecond
)
cancel
()
// cancel ctx
go
secureHandshake
(
t
,
ctx
,
p2
.
PrivKey
,
c2
,
done
)
for
i
:=
0
;
i
<
2
;
i
++
{
if
err
:=
<-
done
;
err
==
nil
{
t
.
Error
(
"cancel should've errored out"
)
}
}
}
func
TestSecureHandshakeFailsWithWrongKeys
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
c1
,
c2
,
p1
,
p2
:=
setupSingleConn
(
t
,
ctx
)
done
:=
make
(
chan
error
)
go
secureHandshake
(
t
,
ctx
,
p2
.
PrivKey
,
c1
,
done
)
go
secureHandshake
(
t
,
ctx
,
p1
.
PrivKey
,
c2
,
done
)
for
i
:=
0
;
i
<
2
;
i
++
{
if
err
:=
<-
done
;
err
==
nil
{
t
.
Fatal
(
"wrong keys should've errored out."
)
}
}
}
func
TestSecureCloseLeak
(
t
*
testing
.
T
)
{
// t.Skip("Skipping in favor of another test")
if
testing
.
Short
()
{
t
.
SkipNow
()
}
if
travis
.
IsRunning
()
{
t
.
Skip
(
"this doesn't work well on travis"
)
}
runPair
:=
func
(
c1
,
c2
Conn
,
num
int
)
{
mc1
:=
msgioWrap
(
c1
)
mc2
:=
msgioWrap
(
c2
)
log
.
Debugf
(
"runPair %d"
,
num
)
for
i
:=
0
;
i
<
num
;
i
++
{
log
.
Debugf
(
"runPair iteration %d"
,
i
)
b1
:=
[]
byte
(
"beep"
)
mc1
.
WriteMsg
(
b1
)
b2
,
err
:=
mc2
.
ReadMsg
()
if
err
!=
nil
{
panic
(
err
)
}
if
!
bytes
.
Equal
(
b1
,
b2
)
{
panic
(
"bytes not equal"
)
}
b2
=
[]
byte
(
"beep"
)
mc2
.
WriteMsg
(
b2
)
b1
,
err
=
mc1
.
ReadMsg
()
if
err
!=
nil
{
panic
(
err
)
}
if
!
bytes
.
Equal
(
b1
,
b2
)
{
panic
(
"bytes not equal"
)
}
time
.
Sleep
(
time
.
Microsecond
*
5
)
}
}
var
cons
=
5
var
msgs
=
50
log
.
Debugf
(
"Running %d connections * %d msgs.
\n
"
,
cons
,
msgs
)
var
wg
sync
.
WaitGroup
for
i
:=
0
;
i
<
cons
;
i
++
{
wg
.
Add
(
1
)
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
c1
,
c2
,
_
,
_
:=
setupSecureConn
(
t
,
ctx
)
go
func
(
c1
,
c2
Conn
)
{
defer
func
()
{
c1
.
Close
()
c2
.
Close
()
cancel
()
wg
.
Done
()
}()
runPair
(
c1
,
c2
,
msgs
)
}(
c1
,
c2
)
}
log
.
Debugf
(
"Waiting..."
)
wg
.
Wait
()
// done!
time
.
Sleep
(
time
.
Millisecond
*
150
)
ngr
:=
runtime
.
NumGoroutine
()
if
ngr
>
25
{
// panic("uncomment me to debug")
t
.
Fatal
(
"leaking goroutines:"
,
ngr
)
}
}
This diff is collapsed.
Click to expand it.
p2p/net/interface.go
View file @
0f3ffb2d
...
...
@@ -8,8 +8,8 @@ import (
pstore
"github.com/ipfs/go-libp2p-peerstore"
ma
"github.com/jbenet/go-multiaddr"
"github.com/jbenet/goprocess"
conn
"github.com/libp2p/go-libp2p-conn"
protocol
"github.com/libp2p/go-libp2p-protocol"
conn
"github.com/libp2p/go-libp2p/p2p/net/conn"
)
// MessageSizeMax is a soft (recommended) maximum for network messages.
...
...
This diff is collapsed.
Click to expand it.
p2p/net/mock/mock_net.go
View file @
0f3ffb2d
package
mocknet
import
(
"context"
"fmt"
"sort"
"sync"
...
...
@@ -9,15 +10,14 @@ import (
bhost
"github.com/libp2p/go-libp2p/p2p/host/basic"
inet
"github.com/libp2p/go-libp2p/p2p/net"
p2putil
"github.com/libp2p/go-libp2p/p2p/test/util"
testutil
"github.com/libp2p/go-libp2p/testutil"
"context"
ic
"github.com/ipfs/go-libp2p-crypto"
peer
"github.com/ipfs/go-libp2p-peer"
pstore
"github.com/ipfs/go-libp2p-peerstore"
ma
"github.com/jbenet/go-multiaddr"
"github.com/jbenet/goprocess"
goprocessctx
"github.com/jbenet/goprocess/context"
testutil
"github.com/libp2p/go-testutil"
)
// mocknet implements mocknet.Mocknet
...
...
This diff is collapsed.
Click to expand it.
p2p/net/mock/mock_test.go
View file @
0f3ffb2d
...
...
@@ -2,6 +2,7 @@ package mocknet
import
(
"bytes"
"context"
"io"
"math"
"math/rand"
...
...
@@ -9,13 +10,12 @@ import (
"testing"
"time"
peer
"github.com/ipfs/go-libp2p-peer"
protocol
"github.com/libp2p/go-libp2p-protocol"
inet
"github.com/libp2p/go-libp2p/p2p/net"
testutil
"github.com/libp2p/go-libp2p/testutil"
"context
"
peer
"github.com/ipfs/go-libp2p-peer
"
detectrace
"github.com/jbenet/go-detect-race"
protocol
"github.com/libp2p/go-libp2p-protocol"
testutil
"github.com/libp2p/go-testutil"
)
func
randPeer
(
t
*
testing
.
T
)
peer
.
ID
{
...
...
This diff is collapsed.
Click to expand it.
p2p/net/swarm/addr/addr.go
deleted
100644 → 0
View file @
0aaec876
package
addrutil
import
(
"fmt"
"context"
logging
"github.com/ipfs/go-log"
ma
"github.com/jbenet/go-multiaddr"
manet
"github.com/jbenet/go-multiaddr-net"
_
"github.com/whyrusleeping/ws-transport"
)
var
log
=
logging
.
Logger
(
"github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
)
// SupportedTransportStrings is the list of supported transports for the swarm.
// These are strings of encapsulated multiaddr protocols. E.g.:
// /ip4/tcp
var
SupportedTransportStrings
=
[]
string
{
"/ip4/tcp"
,
"/ip6/tcp"
,
"/ip4/udp/utp"
,
"/ip6/udp/utp"
,
"/ip4/tcp/ws"
,
"/ip6/tcp/ws"
,
// "/ip4/udp/udt", disabled because the lib doesnt work on arm
// "/ip6/udp/udt", disabled because the lib doesnt work on arm
}
// SupportedTransportProtocols is the list of supported transports for the swarm.
// These are []ma.Protocol lists. Populated at runtime from SupportedTransportStrings
var
SupportedTransportProtocols
=
[][]
ma
.
Protocol
{}
func
init
()
{
// initialize SupportedTransportProtocols
transports
:=
make
([][]
ma
.
Protocol
,
len
(
SupportedTransportStrings
))
for
_
,
s
:=
range
SupportedTransportStrings
{
t
,
err
:=
ma
.
ProtocolsWithString
(
s
)
if
err
!=
nil
{
panic
(
err
)
// important to fix this in the codebase
}
transports
=
append
(
transports
,
t
)
}
SupportedTransportProtocols
=
transports
}
// FilterAddrs is a filter that removes certain addresses, according the given filters.
// if all filters return true, the address is kept.
func
FilterAddrs
(
a
[]
ma
.
Multiaddr
,
filters
...
func
(
ma
.
Multiaddr
)
bool
)
[]
ma
.
Multiaddr
{
b
:=
make
([]
ma
.
Multiaddr
,
0
,
len
(
a
))
for
_
,
addr
:=
range
a
{
good
:=
true
for
_
,
filter
:=
range
filters
{
good
=
good
&&
filter
(
addr
)
}
if
good
{
b
=
append
(
b
,
addr
)
}
}
return
b
}
// FilterUsableAddrs removes certain addresses
// from a list. the addresses removed are those known NOT
// to work with our network. Namely, addresses with UTP.
func
FilterUsableAddrs
(
a
[]
ma
.
Multiaddr
)
[]
ma
.
Multiaddr
{
return
FilterAddrs
(
a
,
AddrUsableFunc
)
}
func
AddrUsableFunc
(
m
ma
.
Multiaddr
)
bool
{
return
AddrUsable
(
m
,
false
)
}
// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
func
AddrOverNonLocalIP
(
a
ma
.
Multiaddr
)
bool
{
split
:=
ma
.
Split
(
a
)
if
len
(
split
)
<
1
{
return
false
}
if
manet
.
IsIP6LinkLocal
(
split
[
0
])
{
return
false
}
return
true
}
// AddrUsable returns whether our network can use this addr.
// We only use the transports in SupportedTransportStrings,
// and we do not link local addresses. Loopback is ok
// as we need to be able to connect to multiple ipfs nodes
// in the same machine.
func
AddrUsable
(
a
ma
.
Multiaddr
,
partial
bool
)
bool
{
if
a
==
nil
{
return
false
}
if
!
AddrOverNonLocalIP
(
a
)
{
return
false
}
// test the address protocol list is in SupportedTransportProtocols
matches
:=
func
(
supported
,
test
[]
ma
.
Protocol
)
bool
{
if
len
(
test
)
>
len
(
supported
)
{
return
false
}
// when partial, it's ok if test < supported.
if
!
partial
&&
len
(
supported
)
!=
len
(
test
)
{
return
false
}
for
i
:=
range
test
{
if
supported
[
i
]
.
Code
!=
test
[
i
]
.
Code
{
return
false
}
}
return
true
}
transport
:=
a
.
Protocols
()
for
_
,
supported
:=
range
SupportedTransportProtocols
{
if
matches
(
supported
,
transport
)
{
return
true
}
}
return
false
}
// ResolveUnspecifiedAddress expands an unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces. If ifaceAddr is nil, we request interface addresses
// from the network stack. (this is so you can provide a cached value if resolving many addrs)
func
ResolveUnspecifiedAddress
(
resolve
ma
.
Multiaddr
,
ifaceAddrs
[]
ma
.
Multiaddr
)
([]
ma
.
Multiaddr
,
error
)
{
// split address into its components
split
:=
ma
.
Split
(
resolve
)
// if first component (ip) is not unspecified, use it as is.
if
!
manet
.
IsIPUnspecified
(
split
[
0
])
{
return
[]
ma
.
Multiaddr
{
resolve
},
nil
}
out
:=
make
([]
ma
.
Multiaddr
,
0
,
len
(
ifaceAddrs
))
for
_
,
ia
:=
range
ifaceAddrs
{
// must match the first protocol to be resolve.
if
ia
.
Protocols
()[
0
]
.
Code
!=
resolve
.
Protocols
()[
0
]
.
Code
{
continue
}
split
[
0
]
=
ia
joined
:=
ma
.
Join
(
split
...
)
out
=
append
(
out
,
joined
)
log
.
Debug
(
"adding resolved addr:"
,
resolve
,
joined
,
out
)
}
if
len
(
out
)
<
1
{
return
nil
,
fmt
.
Errorf
(
"failed to resolve: %s"
,
resolve
)
}
return
out
,
nil
}
// ResolveUnspecifiedAddresses expands unspecified ip addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
func
ResolveUnspecifiedAddresses
(
unspecAddrs
,
ifaceAddrs
[]
ma
.
Multiaddr
)
([]
ma
.
Multiaddr
,
error
)
{
// todo optimize: only fetch these if we have a "any" addr.
if
len
(
ifaceAddrs
)
<
1
{
var
err
error
ifaceAddrs
,
err
=
InterfaceAddresses
()
if
err
!=
nil
{
return
nil
,
err
}
// log.Debug("InterfaceAddresses:", ifaceAddrs)
}
var
outputAddrs
[]
ma
.
Multiaddr
for
_
,
a
:=
range
unspecAddrs
{
// unspecified?
resolved
,
err
:=
ResolveUnspecifiedAddress
(
a
,
ifaceAddrs
)
if
err
!=
nil
{
continue
// optimistic. if we cant resolve anything, we'll know at the bottom.
}
// log.Debug("resolved:", a, resolved)
outputAddrs
=
append
(
outputAddrs
,
resolved
...
)
}
if
len
(
outputAddrs
)
<
1
{
return
nil
,
fmt
.
Errorf
(
"failed to specify addrs: %s"
,
unspecAddrs
)
}
log
.
Event
(
context
.
TODO
(),
"interfaceListenAddresses"
,
func
()
logging
.
Loggable
{
var
addrs
[]
string
for
_
,
addr
:=
range
outputAddrs
{
addrs
=
append
(
addrs
,
addr
.
String
())
}
return
logging
.
Metadata
{
"addresses"
:
addrs
}
}())
log
.
Debug
(
"ResolveUnspecifiedAddresses:"
,
unspecAddrs
,
ifaceAddrs
,
outputAddrs
)
return
outputAddrs
,
nil
}
// InterfaceAddresses returns a list of addresses associated with local machine
// Note: we do not return link local addresses. IP loopback is ok, because we
// may be connecting to other nodes in the same machine.
func
InterfaceAddresses
()
([]
ma
.
Multiaddr
,
error
)
{
maddrs
,
err
:=
manet
.
InterfaceMultiaddrs
()
if
err
!=
nil
{
return
nil
,
err
}
log
.
Debug
(
"InterfaceAddresses: from manet:"
,
maddrs
)
var
out
[]
ma
.
Multiaddr
for
_
,
a
:=
range
maddrs
{
if
!
AddrUsable
(
a
,
true
)
{
// partial
// log.Debug("InterfaceAddresses: skipping unusable:", a)
continue
}
out
=
append
(
out
,
a
)
}
log
.
Debug
(
"InterfaceAddresses: usable:"
,
out
)
return
out
,
nil
}
// AddrInList returns whether or not an address is part of a list.
// this is useful to check if NAT is happening (or other bugs?)
func
AddrInList
(
addr
ma
.
Multiaddr
,
list
[]
ma
.
Multiaddr
)
bool
{
for
_
,
addr2
:=
range
list
{
if
addr
.
Equal
(
addr2
)
{
return
true
}
}
return
false
}
// AddrIsShareableOnWAN returns whether the given address should be shareable on the
// wide area network (wide internet).
func
AddrIsShareableOnWAN
(
addr
ma
.
Multiaddr
)
bool
{
s
:=
ma
.
Split
(
addr
)
if
len
(
s
)
<
1
{
return
false
}
a
:=
s
[
0
]
if
manet
.
IsIPLoopback
(
a
)
||
manet
.
IsIP6LinkLocal
(
a
)
||
manet
.
IsIPUnspecified
(
a
)
{
return
false
}
return
manet
.
IsThinWaist
(
a
)
}
// WANShareableAddrs filters addresses based on whether they're shareable on WAN
func
WANShareableAddrs
(
inp
[]
ma
.
Multiaddr
)
[]
ma
.
Multiaddr
{
return
FilterAddrs
(
inp
,
AddrIsShareableOnWAN
)
}
// Subtract filters out all addrs in b from a
func
Subtract
(
a
,
b
[]
ma
.
Multiaddr
)
[]
ma
.
Multiaddr
{
return
FilterAddrs
(
a
,
func
(
m
ma
.
Multiaddr
)
bool
{
for
_
,
bb
:=
range
b
{
if
m
.
Equal
(
bb
)
{
return
false
}
}
return
true
})
}
// CheckNATWarning checks if our observed addresses differ. if so,
// informs the user that certain things might not work yet
func
CheckNATWarning
(
observed
,
expected
ma
.
Multiaddr
,
listen
[]
ma
.
Multiaddr
)
{
if
observed
.
Equal
(
expected
)
{
return
}
if
!
AddrInList
(
observed
,
listen
)
{
// probably a nat
log
.
Warningf
(
natWarning
,
observed
,
listen
)
}
}
const
natWarning
=
`Remote peer observed our address to be: %s
The local addresses are: %s
Thus, connection is going through NAT, and other connections may fail.
IPFS NAT traversal is still under development. Please bug us on github or irc to fix this.
Baby steps: http://jbenet.static.s3.amazonaws.com/271dfcf/baby-steps.gif
`
This diff is collapsed.
Click to expand it.
p2p/net/swarm/addr/addr_test.go
deleted
100644 → 0
View file @
0aaec876
package
addrutil
import
(
"testing"
ma
"github.com/jbenet/go-multiaddr"
manet
"github.com/jbenet/go-multiaddr-net"
)
func
newMultiaddr
(
t
*
testing
.
T
,
s
string
)
ma
.
Multiaddr
{
maddr
,
err
:=
ma
.
NewMultiaddr
(
s
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
return
maddr
}
func
TestFilterAddrs
(
t
*
testing
.
T
)
{
bad
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/1.2.3.4/udp/1234"
),
// unreliable
newMultiaddr
(
t
,
"/ip4/1.2.3.4/udp/1234/sctp/1234"
),
// not in manet
newMultiaddr
(
t
,
"/ip4/1.2.3.4/udp/1234/udt"
),
// udt is broken on arm
newMultiaddr
(
t
,
"/ip6/fe80::1/tcp/1234"
),
// link local
newMultiaddr
(
t
,
"/ip6/fe80::100/tcp/1234"
),
// link local
}
good
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/127.0.0.1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip4/1.2.3.4/udp/1234/utp"
),
newMultiaddr
(
t
,
"/ip4/1.2.3.4/tcp/1234/ws"
),
}
goodAndBad
:=
append
(
good
,
bad
...
)
// test filters
for
_
,
a
:=
range
bad
{
if
AddrUsable
(
a
,
false
)
{
t
.
Errorf
(
"addr %s should be unusable"
,
a
)
}
}
for
_
,
a
:=
range
good
{
if
!
AddrUsable
(
a
,
false
)
{
t
.
Errorf
(
"addr %s should be usable"
,
a
)
}
}
subtestAddrsEqual
(
t
,
FilterUsableAddrs
(
bad
),
[]
ma
.
Multiaddr
{})
subtestAddrsEqual
(
t
,
FilterUsableAddrs
(
good
),
good
)
subtestAddrsEqual
(
t
,
FilterUsableAddrs
(
goodAndBad
),
good
)
}
func
subtestAddrsEqual
(
t
*
testing
.
T
,
a
,
b
[]
ma
.
Multiaddr
)
{
if
len
(
a
)
!=
len
(
b
)
{
t
.
Error
(
t
)
}
in
:=
func
(
addr
ma
.
Multiaddr
,
l
[]
ma
.
Multiaddr
)
bool
{
for
_
,
addr2
:=
range
l
{
if
addr
.
Equal
(
addr2
)
{
return
true
}
}
return
false
}
for
_
,
aa
:=
range
a
{
if
!
in
(
aa
,
b
)
{
t
.
Errorf
(
"%s not in %s"
,
aa
,
b
)
}
}
}
func
TestInterfaceAddrs
(
t
*
testing
.
T
)
{
addrs
,
err
:=
InterfaceAddresses
()
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
len
(
addrs
)
<
1
{
t
.
Error
(
"no addresses"
)
}
for
_
,
a
:=
range
addrs
{
if
manet
.
IsIP6LinkLocal
(
a
)
{
t
.
Error
(
"should not return ip link local addresses"
,
a
)
}
}
if
len
(
addrs
)
<
1
{
t
.
Error
(
"no good interface addrs"
)
}
}
func
TestResolvingAddrs
(
t
*
testing
.
T
)
{
unspec
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/0.0.0.0/tcp/1234"
),
newMultiaddr
(
t
,
"/ip4/1.2.3.4/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::100/tcp/1234"
),
}
iface
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/127.0.0.1"
),
newMultiaddr
(
t
,
"/ip4/10.20.30.40"
),
newMultiaddr
(
t
,
"/ip6/::1"
),
newMultiaddr
(
t
,
"/ip6/::f"
),
}
spec
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/127.0.0.1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip4/10.20.30.40/tcp/1234"
),
newMultiaddr
(
t
,
"/ip4/1.2.3.4/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::f/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::100/tcp/1234"
),
}
actual
,
err
:=
ResolveUnspecifiedAddresses
(
unspec
,
iface
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
for
i
,
a
:=
range
actual
{
if
!
a
.
Equal
(
spec
[
i
])
{
t
.
Error
(
a
,
" != "
,
spec
[
i
])
}
}
ip4u
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/0.0.0.0"
)}
ip4i
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/1.2.3.4"
)}
ip6u
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip6/::"
)}
ip6i
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip6/::1"
)}
if
_
,
err
:=
ResolveUnspecifiedAddress
(
ip4u
[
0
],
ip6i
);
err
==
nil
{
t
.
Fatal
(
"should have failed"
)
}
if
_
,
err
:=
ResolveUnspecifiedAddress
(
ip6u
[
0
],
ip4i
);
err
==
nil
{
t
.
Fatal
(
"should have failed"
)
}
if
_
,
err
:=
ResolveUnspecifiedAddresses
(
ip6u
,
ip4i
);
err
==
nil
{
t
.
Fatal
(
"should have failed"
)
}
if
_
,
err
:=
ResolveUnspecifiedAddresses
(
ip4u
,
ip6i
);
err
==
nil
{
t
.
Fatal
(
"should have failed"
)
}
}
func
TestWANShareable
(
t
*
testing
.
T
)
{
wanok
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/1.2.3.4/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/abcd::1/tcp/1234"
),
}
wanbad
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/127.0.0.1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip4/0.0.0.0/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/fe80::1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/fe80::/tcp/1234"
),
}
for
_
,
a
:=
range
wanok
{
if
!
AddrIsShareableOnWAN
(
a
)
{
t
.
Error
(
"should be true"
,
a
)
}
}
for
_
,
a
:=
range
wanbad
{
if
AddrIsShareableOnWAN
(
a
)
{
t
.
Error
(
"should be false"
,
a
)
}
}
wanok2
:=
WANShareableAddrs
(
wanok
)
if
len
(
wanok
)
!=
len
(
wanok2
)
{
t
.
Error
(
"should be the same"
)
}
wanbad2
:=
WANShareableAddrs
(
wanbad
)
if
len
(
wanbad2
)
!=
0
{
t
.
Error
(
"should be zero"
)
}
}
func
TestSubtract
(
t
*
testing
.
T
)
{
a
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/127.0.0.1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip4/0.0.0.0/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/fe80::1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/fe80::/tcp/1234"
),
}
b
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/127.0.0.1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::1/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/fe80::1/tcp/1234"
),
}
c1
:=
[]
ma
.
Multiaddr
{
newMultiaddr
(
t
,
"/ip4/0.0.0.0/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/::/tcp/1234"
),
newMultiaddr
(
t
,
"/ip6/fe80::/tcp/1234"
),
}
c2
:=
Subtract
(
a
,
b
)
if
len
(
c1
)
!=
len
(
c2
)
{
t
.
Error
(
"should be the same"
)
}
for
i
,
ca
:=
range
c1
{
if
!
c2
[
i
]
.
Equal
(
ca
)
{
t
.
Error
(
"should be the same"
,
ca
,
c2
[
i
])
}
}
}
This diff is collapsed.
Click to expand it.
p2p/net/swarm/addr/filter.go
deleted
100644 → 0
View file @
0aaec876
package
addrutil
import
(
ma
"github.com/jbenet/go-multiaddr"
mafmt
"github.com/whyrusleeping/mafmt"
)
// SubtractFilter returns a filter func that filters all of the given addresses
func
SubtractFilter
(
addrs
...
ma
.
Multiaddr
)
func
(
ma
.
Multiaddr
)
bool
{
addrmap
:=
make
(
map
[
string
]
bool
)
for
_
,
a
:=
range
addrs
{
addrmap
[
string
(
a
.
Bytes
())]
=
true
}
return
func
(
a
ma
.
Multiaddr
)
bool
{
return
!
addrmap
[
string
(
a
.
Bytes
())]
}
}
// IsFDCostlyTransport returns true for transports that require a new file
// descriptor per connection created
func
IsFDCostlyTransport
(
a
ma
.
Multiaddr
)
bool
{
return
mafmt
.
TCP
.
Matches
(
a
)
}
// FilterNeg returns a negated version of the passed in filter
func
FilterNeg
(
f
func
(
ma
.
Multiaddr
)
bool
)
func
(
ma
.
Multiaddr
)
bool
{
return
func
(
a
ma
.
Multiaddr
)
bool
{
return
!
f
(
a
)
}
}
This diff is collapsed.
Click to expand it.
p2p/net/swarm/dial_test.go
View file @
0f3ffb2d
package
swarm
import
(
"context"
"net"
"sync"
"testing"
"time"
addrutil
"github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
testutil
"github.com/libp2p/go-libp2p/testutil"
ci
"github.com/libp2p/go-libp2p/testutil/ci"
"context"
peer
"github.com/ipfs/go-libp2p-peer"
pstore
"github.com/ipfs/go-libp2p-peerstore"
ma
"github.com/jbenet/go-multiaddr"
manet
"github.com/jbenet/go-multiaddr-net"
addrutil
"github.com/libp2p/go-addr-util"
testutil
"github.com/libp2p/go-testutil"
ci
"github.com/libp2p/go-testutil/ci"
)
func
closeSwarms
(
swarms
[]
*
Swarm
)
{
...
...
This diff is collapsed.
Click to expand it.
p2p/net/swarm/limiter.go
View file @
0f3ffb2d
...
...
@@ -6,9 +6,8 @@ import (
peer
"github.com/ipfs/go-libp2p-peer"
ma
"github.com/jbenet/go-multiaddr"
conn
"github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil
"github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
addrutil
"github.com/libp2p/go-addr-util"
conn
"github.com/libp2p/go-libp2p-conn"
)
type
dialResult
struct
{
...
...
This diff is collapsed.
Click to expand it.
p2p/net/swarm/limiter_test.go
View file @
0f3ffb2d
...
...
@@ -10,9 +10,8 @@ import (
peer
"github.com/ipfs/go-libp2p-peer"
ma
"github.com/jbenet/go-multiaddr"
conn
"github.com/libp2p/go-libp2p-conn"
mafmt
"github.com/whyrusleeping/mafmt"
conn
"github.com/libp2p/go-libp2p/p2p/net/conn"
)
func
mustAddr
(
t
*
testing
.
T
,
s
string
)
ma
.
Multiaddr
{
...
...
This diff is collapsed.
Click to expand it.
p2p/net/swarm/simul_test.go
View file @
0f3ffb2d
package
swarm
import
(
"context"
"runtime"
"sync"
"testing"
"time"
ci
"github.com/libp2p/go-libp2p/testutil/ci"
"context"
peer
"github.com/ipfs/go-libp2p-peer"
pstore
"github.com/ipfs/go-libp2p-peerstore"
ma
"github.com/jbenet/go-multiaddr"
ci
"github.com/libp2p/go-testutil/ci"
)
func
TestSimultOpen
(
t
*
testing
.
T
)
{
...
...
This diff is collapsed.
Click to expand it.
p2p/net/swarm/swarm.go
View file @
0f3ffb2d
...
...
@@ -14,8 +14,6 @@ import (
metrics
"github.com/libp2p/go-libp2p/p2p/metrics"
mconn
"github.com/libp2p/go-libp2p/p2p/metrics/conn"
inet
"github.com/libp2p/go-libp2p/p2p/net"
conn
"github.com/libp2p/go-libp2p/p2p/net/conn"
addrutil
"github.com/libp2p/go-libp2p/p2p/net/swarm/addr"
ci
"github.com/ipfs/go-libp2p-crypto"
peer
"github.com/ipfs/go-libp2p-peer"
...
...
@@ -26,6 +24,8 @@ import (
pst
"github.com/jbenet/go-stream-muxer"
"github.com/jbenet/goprocess"
goprocessctx
"github.com/jbenet/goprocess/context"
addrutil
"github.com/libp2p/go-addr-util"
conn
"github.com/libp2p/go-libp2p-conn"
transport
"github.com/libp2p/go-libp2p-transport"
filter
"github.com/libp2p/go-maddr-filter"
tcpt
"github.com/libp2p/go-tcp-transport"
...
...
This diff is collapsed.
Click to expand it.
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment
Menu
Projects
Groups
Snippets
Help