Commit b938ab9a authored by Jeromy's avatar Jeromy
Browse files

Handle incoming conns in their own goroutines

Doing the multistream negotiation in sync causes hanging issues.
This commit accepts transport connections and starts the negotiation
in a separate goroutine, sending it down a channel when its ready.
parent 59928fba
...@@ -397,3 +397,55 @@ func TestFailedAccept(t *testing.T) { ...@@ -397,3 +397,55 @@ func TestFailedAccept(t *testing.T) {
c.Close() c.Close()
<-done <-done
} }
func TestHangingAccept(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
done := make(chan struct{})
go func() {
defer close(done)
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("first dial failed: ", err)
}
// hang this connection
defer con.Close()
// ensure that the first conn hits first
time.Sleep(time.Millisecond * 50)
con2, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("second dial failed: ", err)
}
defer con2.Close()
err = msmux.SelectProtoOrFail(SecioTag, con2)
if err != nil {
t.Error("msmux select failed: ", err)
}
_, err = con2.Write([]byte("test"))
if err != nil {
t.Error("con write failed: ", err)
}
}()
c, err := l1.Accept()
if err != nil {
t.Fatal("connections after a failed accept should still work: ", err)
}
c.Close()
<-done
}
...@@ -4,6 +4,8 @@ import ( ...@@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"time"
ic "github.com/ipfs/go-libp2p/p2p/crypto" ic "github.com/ipfs/go-libp2p/p2p/crypto"
filter "github.com/ipfs/go-libp2p/p2p/net/filter" filter "github.com/ipfs/go-libp2p/p2p/net/filter"
...@@ -20,6 +22,26 @@ import ( ...@@ -20,6 +22,26 @@ import (
const SecioTag = "/secio/1.0.0" const SecioTag = "/secio/1.0.0"
const NoEncryptionTag = "/plaintext/1.0.0" const NoEncryptionTag = "/plaintext/1.0.0"
const connAcceptBuffer = 32
const NegotiateReadTimeout = time.Second * 20
var catcher = tec.TempErrCatcher{
IsTemp: func(e error) bool {
// ignore connection breakages up to this point. but log them
if e == io.EOF {
log.Debugf("listener ignoring conn with EOF: %s", e)
return true
}
te, ok := e.(tec.Temporary)
if ok {
log.Debugf("listener ignoring conn with temporary err: %s", e)
return te.Temporary()
}
return false
},
}
// ConnWrapper is any function that wraps a raw multiaddr connection // ConnWrapper is any function that wraps a raw multiaddr connection
type ConnWrapper func(transport.Conn) transport.Conn type ConnWrapper func(transport.Conn) transport.Conn
...@@ -37,6 +59,10 @@ type listener struct { ...@@ -37,6 +59,10 @@ type listener struct {
proc goprocess.Process proc goprocess.Process
mux *msmux.MultistreamMuxer mux *msmux.MultistreamMuxer
incoming chan transport.Conn
ctx context.Context
} }
func (l *listener) teardown() error { func (l *listener) teardown() error {
...@@ -60,57 +86,8 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) { ...@@ -60,57 +86,8 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) {
// Accept waits for and returns the next connection to the listener. // Accept waits for and returns the next connection to the listener.
// Note that unfortunately this // Note that unfortunately this
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
for con := range l.incoming {
// listeners dont have contexts. given changes dont make sense here anymore c, err := newSingleConn(l.ctx, l.local, "", con)
// note that the parent of listener will Close, which will interrupt all io.
// Contexts and io don't mix.
ctx := context.Background()
var catcher tec.TempErrCatcher
catcher.IsTemp = func(e error) bool {
// ignore connection breakages up to this point. but log them
if e == io.EOF {
log.Debugf("listener ignoring conn with EOF: %s", e)
return true
}
te, ok := e.(tec.Temporary)
if ok {
log.Debugf("listener ignoring conn with temporary err: %s", e)
return te.Temporary()
}
return false
}
for {
maconn, err := l.Listener.Accept()
if err != nil {
if catcher.IsTemporary(err) {
continue
}
return nil, err
}
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}
// If we have a wrapper func, wrap this conn
if l.wrapper != nil {
maconn = l.wrapper(maconn)
}
_, _, err = l.mux.Negotiate(maconn)
if err != nil {
log.Info("negotiation of crypto protocol failed: ", err)
continue
}
c, err := newSingleConn(ctx, l.local, "", maconn)
if err != nil { if err != nil {
if catcher.IsTemporary(err) { if catcher.IsTemporary(err) {
continue continue
...@@ -122,13 +99,14 @@ func (l *listener) Accept() (net.Conn, error) { ...@@ -122,13 +99,14 @@ func (l *listener) Accept() (net.Conn, error) {
log.Warning("listener %s listening INSECURELY!", l) log.Warning("listener %s listening INSECURELY!", l)
return c, nil return c, nil
} }
sc, err := newSecureConn(ctx, l.privk, c) sc, err := newSecureConn(l.ctx, l.privk, c)
if err != nil { if err != nil {
log.Infof("ignoring conn we failed to secure: %s %s", err, c) log.Infof("ignoring conn we failed to secure: %s %s", err, c)
continue continue
} }
return sc, nil return sc, nil
} }
return nil, fmt.Errorf("listener is closed")
} }
func (l *listener) Addr() net.Addr { func (l *listener) Addr() net.Addr {
...@@ -157,12 +135,62 @@ func (l *listener) Loggable() map[string]interface{} { ...@@ -157,12 +135,62 @@ func (l *listener) Loggable() map[string]interface{} {
} }
} }
func (l *listener) handleIncoming() {
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(l.incoming)
}()
for {
maconn, err := l.Listener.Accept()
if err != nil {
if catcher.IsTemporary(err) {
continue
}
log.Warningf("listener errored and will close: %s", err)
return
}
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}
// If we have a wrapper func, wrap this conn
if l.wrapper != nil {
maconn = l.wrapper(maconn)
}
wg.Add(1)
go func() {
defer wg.Done()
maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout))
_, _, err = l.mux.Negotiate(maconn)
if err != nil {
log.Info("negotiation of crypto protocol failed: ", err)
maconn.Close()
return
}
// clear read readline
maconn.SetReadDeadline(time.Time{})
l.incoming <- maconn
}()
}
}
func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) { func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
l := &listener{ l := &listener{
Listener: ml, Listener: ml,
local: local, local: local,
privk: sk, privk: sk,
mux: msmux.NewMultistreamMuxer(), mux: msmux.NewMultistreamMuxer(),
incoming: make(chan transport.Conn, connAcceptBuffer),
ctx: ctx,
} }
l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown) l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown)
...@@ -172,6 +200,8 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee ...@@ -172,6 +200,8 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee
l.mux.AddHandler(NoEncryptionTag, nil) l.mux.AddHandler(NoEncryptionTag, nil)
} }
go l.handleIncoming()
log.Debugf("Conn Listener on %s", l.Multiaddr()) log.Debugf("Conn Listener on %s", l.Multiaddr())
log.Event(ctx, "swarmListen", l) log.Event(ctx, "swarmListen", l)
return l, nil return l, nil
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment