diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 51a6dde03a5cdef7a6728f6908a42e889bf1648d..20d7ffd0ff8249d9dc664f67faf10fa08771a25f 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -25,23 +25,6 @@ const NoEncryptionTag = "/plaintext/1.0.0" const connAcceptBuffer = 32 const NegotiateReadTimeout = time.Second * 20 -var catcher = tec.TempErrCatcher{ - IsTemp: func(e error) bool { - // ignore connection breakages up to this point. but log them - if e == io.EOF { - log.Debugf("listener ignoring conn with EOF: %s", e) - return true - } - - te, ok := e.(tec.Temporary) - if ok { - log.Debugf("listener ignoring conn with temporary err: %s", e) - return te.Temporary() - } - return false - }, -} - // ConnWrapper is any function that wraps a raw multiaddr connection type ConnWrapper func(transport.Conn) transport.Conn @@ -55,12 +38,13 @@ type listener struct { filters *filter.Filters wrapper ConnWrapper + catcher tec.TempErrCatcher proc goprocess.Process mux *msmux.MultistreamMuxer - incoming chan transport.Conn + incoming chan connErr ctx context.Context } @@ -83,13 +67,23 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) { l.filters = fs } +type connErr struct { + conn transport.Conn + err error +} + // Accept waits for and returns the next connection to the listener. // Note that unfortunately this func (l *listener) Accept() (net.Conn, error) { for con := range l.incoming { - c, err := newSingleConn(l.ctx, l.local, "", con) + if con.err != nil { + return nil, con.err + } + + c, err := newSingleConn(l.ctx, l.local, "", con.conn) if err != nil { - if catcher.IsTemporary(err) { + con.conn.Close() + if l.catcher.IsTemporary(err) { continue } return nil, err @@ -101,6 +95,7 @@ func (l *listener) Accept() (net.Conn, error) { } sc, err := newSecureConn(l.ctx, l.privk, c) if err != nil { + con.conn.Close() log.Infof("ignoring conn we failed to secure: %s %s", err, c) continue } @@ -142,13 +137,17 @@ func (l *listener) handleIncoming() { close(l.incoming) }() + wg.Add(1) + defer wg.Done() + for { maconn, err := l.Listener.Accept() if err != nil { - if catcher.IsTemporary(err) { + if l.catcher.IsTemporary(err) { continue } - log.Warningf("listener errored and will close: %s", err) + + l.incoming <- connErr{err: err} return } @@ -170,7 +169,7 @@ func (l *listener) handleIncoming() { maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout)) _, _, err = l.mux.Negotiate(maconn) if err != nil { - log.Info("negotiation of crypto protocol failed: ", err) + log.Info("incoming conn: negotiation of crypto protocol failed: ", err) maconn.Close() return } @@ -178,7 +177,7 @@ func (l *listener) handleIncoming() { // clear read readline maconn.SetReadDeadline(time.Time{}) - l.incoming <- maconn + l.incoming <- connErr{conn: maconn} }() } } @@ -189,10 +188,24 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee local: local, privk: sk, mux: msmux.NewMultistreamMuxer(), - incoming: make(chan transport.Conn, connAcceptBuffer), + incoming: make(chan connErr, connAcceptBuffer), ctx: ctx, } l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown) + l.catcher.IsTemp = func(e error) bool { + // ignore connection breakages up to this point. but log them + if e == io.EOF { + log.Debugf("listener ignoring conn with EOF: %s", e) + return true + } + + te, ok := e.(tec.Temporary) + if ok { + log.Debugf("listener ignoring conn with temporary err: %s", e) + return te.Temporary() + } + return false + } if EncryptConnections { l.mux.AddHandler(SecioTag, nil)