Commit 49b35719 authored by Jeromy's avatar Jeromy Committed by Juan Batiz-Benet
Browse files

filter incoming connections and add a test of functionality



- add extra check to dialblock test
- move filter to separate package
- also improved tests
- sunk filters down into p2p/net/conn/listener

License: MIT
Signed-off-by: default avatarJeromy <jeromyj@gmail.com>
Signed-off-by: default avatarJuan Batiz-Benet <juan@benet.ai>
parent 9a3d287c
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
ic "github.com/ipfs/go-ipfs/p2p/crypto" ic "github.com/ipfs/go-ipfs/p2p/crypto"
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio" msgio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-msgio"
...@@ -86,6 +87,8 @@ type Listener interface { ...@@ -86,6 +87,8 @@ type Listener interface {
// LocalPeer is the identity of the local Peer. // LocalPeer is the identity of the local Peer.
LocalPeer() peer.ID LocalPeer() peer.ID
SetAddrFilters(*filter.Filters)
// Close closes the listener. // Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors. // Any blocked Accept operations will be unblocked and return errors.
Close() error Close() error
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
ic "github.com/ipfs/go-ipfs/p2p/crypto" ic "github.com/ipfs/go-ipfs/p2p/crypto"
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
) )
...@@ -26,6 +27,8 @@ type listener struct { ...@@ -26,6 +27,8 @@ type listener struct {
local peer.ID // LocalPeer is the identity of the local Peer local peer.ID // LocalPeer is the identity of the local Peer
privk ic.PrivKey // private key to use to initialize secure conns privk ic.PrivKey // private key to use to initialize secure conns
filters *filter.Filters
wrapper ConnWrapper wrapper ConnWrapper
cg ctxgroup.ContextGroup cg ctxgroup.ContextGroup
...@@ -45,6 +48,10 @@ func (l *listener) String() string { ...@@ -45,6 +48,10 @@ func (l *listener) String() string {
return fmt.Sprintf("<Listener %s %s>", l.local, l.Multiaddr()) return fmt.Sprintf("<Listener %s %s>", l.local, l.Multiaddr())
} }
func (l *listener) SetAddrFilters(fs *filter.Filters) {
l.filters = fs
}
// Accept waits for and returns the next connection to the listener. // Accept waits for and returns the next connection to the listener.
// Note that unfortunately this // Note that unfortunately this
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
...@@ -81,6 +88,12 @@ func (l *listener) Accept() (net.Conn, error) { ...@@ -81,6 +88,12 @@ func (l *listener) Accept() (net.Conn, error) {
} }
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr()) log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}
// If we have a wrapper func, wrap this conn // If we have a wrapper func, wrap this conn
if l.wrapper != nil { if l.wrapper != nil {
maconn = l.wrapper(maconn) maconn = l.wrapper(maconn)
......
package filter
import (
"net"
"strings"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
)
type Filters struct {
filters []*net.IPNet
}
func (fs *Filters) AddDialFilter(f *net.IPNet) {
fs.filters = append(fs.filters, f)
}
func (f *Filters) AddrBlocked(a ma.Multiaddr) bool {
_, addr, err := manet.DialArgs(a)
if err != nil {
// if we cant parse it, its probably not blocked
return false
}
ipstr := strings.Split(addr, ":")[0]
ip := net.ParseIP(ipstr)
for _, ft := range f.filters {
if ft.Contains(ip) {
return true
}
}
return false
}
...@@ -4,19 +4,18 @@ package swarm ...@@ -4,19 +4,18 @@ package swarm
import ( import (
"fmt" "fmt"
"net"
"sync" "sync"
"time" "time"
metrics "github.com/ipfs/go-ipfs/metrics" metrics "github.com/ipfs/go-ipfs/metrics"
inet "github.com/ipfs/go-ipfs/p2p/net" inet "github.com/ipfs/go-ipfs/p2p/net"
filter "github.com/ipfs/go-ipfs/p2p/net/filter"
addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr"
peer "github.com/ipfs/go-ipfs/p2p/peer" peer "github.com/ipfs/go-ipfs/p2p/peer"
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ctxgroup "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
manet "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" ps "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream"
pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport" pst "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport"
psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" psy "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux"
...@@ -53,7 +52,7 @@ type Swarm struct { ...@@ -53,7 +52,7 @@ type Swarm struct {
notifs map[inet.Notifiee]ps.Notifiee notifs map[inet.Notifiee]ps.Notifiee
// filters for addresses that shouldnt be dialed // filters for addresses that shouldnt be dialed
Filters *Filters Filters *filter.Filters
cg ctxgroup.ContextGroup cg ctxgroup.ContextGroup
bwc metrics.Reporter bwc metrics.Reporter
...@@ -76,7 +75,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, ...@@ -76,7 +75,7 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
dialT: DialTimeout, dialT: DialTimeout,
notifs: make(map[inet.Notifiee]ps.Notifiee), notifs: make(map[inet.Notifiee]ps.Notifiee),
bwc: bwc, bwc: bwc,
Filters: new(Filters), Filters: new(filter.Filters),
} }
// configure Swarm // configure Swarm
...@@ -90,30 +89,6 @@ func (s *Swarm) teardown() error { ...@@ -90,30 +89,6 @@ func (s *Swarm) teardown() error {
return s.swarm.Close() return s.swarm.Close()
} }
type Filters struct {
filters []*net.IPNet
}
func (fs *Filters) AddDialFilter(f *net.IPNet) {
fs.filters = append(fs.filters, f)
}
func (f *Filters) AddrBlocked(a ma.Multiaddr) bool {
_, addr, err := manet.DialArgs(a)
if err != nil {
// if we cant parse it, its probably not blocked
return false
}
ip := net.ParseIP(addr)
for _, ft := range f.filters {
if ft.Contains(ip) {
return true
}
}
return false
}
// CtxGroup returns the Context Group of the swarm // CtxGroup returns the Context Group of the swarm
func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) {
if len(listenAddrs) > 0 { if len(listenAddrs) > 0 {
......
...@@ -303,7 +303,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -303,7 +303,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
ila, _ := s.InterfaceListenAddresses() ila, _ := s.InterfaceListenAddresses()
remoteAddrs = addrutil.Subtract(remoteAddrs, ila) remoteAddrs = addrutil.Subtract(remoteAddrs, ila)
remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local)) remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local))
remoteAddrs = s.filterAddrs(remoteAddrs)
log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs) log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs)
if len(remoteAddrs) == 0 { if len(remoteAddrs) == 0 {
...@@ -312,6 +311,13 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { ...@@ -312,6 +311,13 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, err return nil, err
} }
remoteAddrs = s.filterAddrs(remoteAddrs)
if len(remoteAddrs) == 0 {
err := errors.New("all adresses for peer have been filtered out")
logdial["error"] = err
return nil, err
}
// open connection to peer // open connection to peer
d := &conn.Dialer{ d := &conn.Dialer{
Dialer: manet.Dialer{ Dialer: manet.Dialer{
......
...@@ -69,6 +69,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error { ...@@ -69,6 +69,8 @@ func (s *Swarm) setupListener(maddr ma.Multiaddr) error {
return err return err
} }
list.SetAddrFilters(s.Filters)
if cw, ok := list.(conn.ListenerConnWrapper); ok { if cw, ok := list.(conn.ListenerConnWrapper); ok {
cw.SetConnWrapper(func(c manet.Conn) manet.Conn { cw.SetConnWrapper(func(c manet.Conn) manet.Conn {
return mconn.WrapConn(s.bwc, c) return mconn.WrapConn(s.bwc, c)
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"net"
"sync" "sync"
"testing" "testing"
"time" "time"
...@@ -270,3 +271,60 @@ func TestConnHandler(t *testing.T) { ...@@ -270,3 +271,60 @@ func TestConnHandler(t *testing.T) {
default: default:
} }
} }
func TestAddrBlocking(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
swarms[0].SetConnHandler(func(conn *Conn) {
t.Fatal("no connections should happen!")
})
_, block, err := net.ParseCIDR("127.0.0.1/8")
if err != nil {
t.Fatal(err)
}
swarms[1].Filters.AddDialFilter(block)
swarms[1].peers.AddAddr(swarms[0].LocalPeer(), swarms[0].ListenAddresses()[0], peer.PermanentAddrTTL)
_, err = swarms[1].Dial(context.TODO(), swarms[0].LocalPeer())
if err == nil {
t.Fatal("dial should have failed")
}
swarms[0].peers.AddAddr(swarms[1].LocalPeer(), swarms[1].ListenAddresses()[0], peer.PermanentAddrTTL)
_, err = swarms[0].Dial(context.TODO(), swarms[1].LocalPeer())
if err == nil {
t.Fatal("dial should have failed")
}
}
func TestFilterBounds(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
conns := make(chan struct{}, 8)
swarms[0].SetConnHandler(func(conn *Conn) {
conns <- struct{}{}
})
// Address that we wont be dialing from
_, block, err := net.ParseCIDR("192.0.0.1/8")
if err != nil {
t.Fatal(err)
}
// set filter on both sides, shouldnt matter
swarms[1].Filters.AddDialFilter(block)
swarms[0].Filters.AddDialFilter(block)
connectSwarms(t, ctx, swarms)
select {
case <-time.After(time.Second):
t.Fatal("should have gotten connection")
case <-conns:
fmt.Println("got connect")
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment