From b938ab9ab685688e379e9bfbc8c714f3cf5d450f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 9 Apr 2016 22:40:40 -0700 Subject: [PATCH] Handle incoming conns in their own goroutines Doing the multistream negotiation in sync causes hanging issues. This commit accepts transport connections and starts the negotiation in a separate goroutine, sending it down a channel when its ready. --- p2p/net/conn/dial_test.go | 52 +++++++++++++++ p2p/net/conn/listen.go | 134 +++++++++++++++++++++++--------------- 2 files changed, 134 insertions(+), 52 deletions(-) diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 2179cbd..23a5914 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -397,3 +397,55 @@ func TestFailedAccept(t *testing.T) { c.Close() <-done } + +func TestHangingAccept(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p1 := tu.RandPeerNetParamsOrFatal(t) + + l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey) + if err != nil { + t.Fatal(err) + } + + p1.Addr = l1.Multiaddr() // Addr has been determined by kernel. + + done := make(chan struct{}) + go func() { + defer close(done) + con, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + t.Error("first dial failed: ", err) + } + // hang this connection + defer con.Close() + + // ensure that the first conn hits first + time.Sleep(time.Millisecond * 50) + + con2, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + t.Error("second dial failed: ", err) + } + defer con2.Close() + + err = msmux.SelectProtoOrFail(SecioTag, con2) + if err != nil { + t.Error("msmux select failed: ", err) + } + + _, err = con2.Write([]byte("test")) + if err != nil { + t.Error("con write failed: ", err) + } + }() + + c, err := l1.Accept() + if err != nil { + t.Fatal("connections after a failed accept should still work: ", err) + } + + c.Close() + <-done +} diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 3d81cba..51a6dde 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -4,6 +4,8 @@ import ( "fmt" "io" "net" + "sync" + "time" ic "github.com/ipfs/go-libp2p/p2p/crypto" filter "github.com/ipfs/go-libp2p/p2p/net/filter" @@ -20,6 +22,26 @@ import ( const SecioTag = "/secio/1.0.0" const NoEncryptionTag = "/plaintext/1.0.0" +const connAcceptBuffer = 32 +const NegotiateReadTimeout = time.Second * 20 + +var catcher = tec.TempErrCatcher{ + IsTemp: func(e error) bool { + // ignore connection breakages up to this point. but log them + if e == io.EOF { + log.Debugf("listener ignoring conn with EOF: %s", e) + return true + } + + te, ok := e.(tec.Temporary) + if ok { + log.Debugf("listener ignoring conn with temporary err: %s", e) + return te.Temporary() + } + return false + }, +} + // ConnWrapper is any function that wraps a raw multiaddr connection type ConnWrapper func(transport.Conn) transport.Conn @@ -37,6 +59,10 @@ type listener struct { proc goprocess.Process mux *msmux.MultistreamMuxer + + incoming chan transport.Conn + + ctx context.Context } func (l *listener) teardown() error { @@ -60,57 +86,8 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) { // Accept waits for and returns the next connection to the listener. // Note that unfortunately this func (l *listener) Accept() (net.Conn, error) { - - // listeners dont have contexts. given changes dont make sense here anymore - // note that the parent of listener will Close, which will interrupt all io. - // Contexts and io don't mix. - ctx := context.Background() - - var catcher tec.TempErrCatcher - - catcher.IsTemp = func(e error) bool { - // ignore connection breakages up to this point. but log them - if e == io.EOF { - log.Debugf("listener ignoring conn with EOF: %s", e) - return true - } - - te, ok := e.(tec.Temporary) - if ok { - log.Debugf("listener ignoring conn with temporary err: %s", e) - return te.Temporary() - } - return false - } - - for { - maconn, err := l.Listener.Accept() - if err != nil { - if catcher.IsTemporary(err) { - continue - } - return nil, err - } - - log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr()) - - if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) { - log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr()) - maconn.Close() - continue - } - // If we have a wrapper func, wrap this conn - if l.wrapper != nil { - maconn = l.wrapper(maconn) - } - - _, _, err = l.mux.Negotiate(maconn) - if err != nil { - log.Info("negotiation of crypto protocol failed: ", err) - continue - } - - c, err := newSingleConn(ctx, l.local, "", maconn) + for con := range l.incoming { + c, err := newSingleConn(l.ctx, l.local, "", con) if err != nil { if catcher.IsTemporary(err) { continue @@ -122,13 +99,14 @@ func (l *listener) Accept() (net.Conn, error) { log.Warning("listener %s listening INSECURELY!", l) return c, nil } - sc, err := newSecureConn(ctx, l.privk, c) + sc, err := newSecureConn(l.ctx, l.privk, c) if err != nil { log.Infof("ignoring conn we failed to secure: %s %s", err, c) continue } return sc, nil } + return nil, fmt.Errorf("listener is closed") } func (l *listener) Addr() net.Addr { @@ -157,12 +135,62 @@ func (l *listener) Loggable() map[string]interface{} { } } +func (l *listener) handleIncoming() { + var wg sync.WaitGroup + defer func() { + wg.Wait() + close(l.incoming) + }() + + for { + maconn, err := l.Listener.Accept() + if err != nil { + if catcher.IsTemporary(err) { + continue + } + log.Warningf("listener errored and will close: %s", err) + return + } + + log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr()) + + if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) { + log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr()) + maconn.Close() + continue + } + // If we have a wrapper func, wrap this conn + if l.wrapper != nil { + maconn = l.wrapper(maconn) + } + + wg.Add(1) + go func() { + defer wg.Done() + maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout)) + _, _, err = l.mux.Negotiate(maconn) + if err != nil { + log.Info("negotiation of crypto protocol failed: ", err) + maconn.Close() + return + } + + // clear read readline + maconn.SetReadDeadline(time.Time{}) + + l.incoming <- maconn + }() + } +} + func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) { l := &listener{ Listener: ml, local: local, privk: sk, mux: msmux.NewMultistreamMuxer(), + incoming: make(chan transport.Conn, connAcceptBuffer), + ctx: ctx, } l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown) @@ -172,6 +200,8 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee l.mux.AddHandler(NoEncryptionTag, nil) } + go l.handleIncoming() + log.Debugf("Conn Listener on %s", l.Multiaddr()) log.Event(ctx, "swarmListen", l) return l, nil -- GitLab