Commit 9d443900 authored by Jeromy's avatar Jeromy
Browse files

address feedback from CR

parent b938ab9a
...@@ -25,23 +25,6 @@ const NoEncryptionTag = "/plaintext/1.0.0" ...@@ -25,23 +25,6 @@ const NoEncryptionTag = "/plaintext/1.0.0"
const connAcceptBuffer = 32 const connAcceptBuffer = 32
const NegotiateReadTimeout = time.Second * 20 const NegotiateReadTimeout = time.Second * 20
var catcher = tec.TempErrCatcher{
IsTemp: func(e error) bool {
// ignore connection breakages up to this point. but log them
if e == io.EOF {
log.Debugf("listener ignoring conn with EOF: %s", e)
return true
}
te, ok := e.(tec.Temporary)
if ok {
log.Debugf("listener ignoring conn with temporary err: %s", e)
return te.Temporary()
}
return false
},
}
// ConnWrapper is any function that wraps a raw multiaddr connection // ConnWrapper is any function that wraps a raw multiaddr connection
type ConnWrapper func(transport.Conn) transport.Conn type ConnWrapper func(transport.Conn) transport.Conn
...@@ -55,12 +38,13 @@ type listener struct { ...@@ -55,12 +38,13 @@ type listener struct {
filters *filter.Filters filters *filter.Filters
wrapper ConnWrapper wrapper ConnWrapper
catcher tec.TempErrCatcher
proc goprocess.Process proc goprocess.Process
mux *msmux.MultistreamMuxer mux *msmux.MultistreamMuxer
incoming chan transport.Conn incoming chan connErr
ctx context.Context ctx context.Context
} }
...@@ -83,13 +67,23 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) { ...@@ -83,13 +67,23 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) {
l.filters = fs l.filters = fs
} }
type connErr struct {
conn transport.Conn
err error
}
// Accept waits for and returns the next connection to the listener. // Accept waits for and returns the next connection to the listener.
// Note that unfortunately this // Note that unfortunately this
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
for con := range l.incoming { for con := range l.incoming {
c, err := newSingleConn(l.ctx, l.local, "", con) if con.err != nil {
return nil, con.err
}
c, err := newSingleConn(l.ctx, l.local, "", con.conn)
if err != nil { if err != nil {
if catcher.IsTemporary(err) { con.conn.Close()
if l.catcher.IsTemporary(err) {
continue continue
} }
return nil, err return nil, err
...@@ -101,6 +95,7 @@ func (l *listener) Accept() (net.Conn, error) { ...@@ -101,6 +95,7 @@ func (l *listener) Accept() (net.Conn, error) {
} }
sc, err := newSecureConn(l.ctx, l.privk, c) sc, err := newSecureConn(l.ctx, l.privk, c)
if err != nil { if err != nil {
con.conn.Close()
log.Infof("ignoring conn we failed to secure: %s %s", err, c) log.Infof("ignoring conn we failed to secure: %s %s", err, c)
continue continue
} }
...@@ -142,13 +137,17 @@ func (l *listener) handleIncoming() { ...@@ -142,13 +137,17 @@ func (l *listener) handleIncoming() {
close(l.incoming) close(l.incoming)
}() }()
wg.Add(1)
defer wg.Done()
for { for {
maconn, err := l.Listener.Accept() maconn, err := l.Listener.Accept()
if err != nil { if err != nil {
if catcher.IsTemporary(err) { if l.catcher.IsTemporary(err) {
continue continue
} }
log.Warningf("listener errored and will close: %s", err)
l.incoming <- connErr{err: err}
return return
} }
...@@ -170,7 +169,7 @@ func (l *listener) handleIncoming() { ...@@ -170,7 +169,7 @@ func (l *listener) handleIncoming() {
maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout)) maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout))
_, _, err = l.mux.Negotiate(maconn) _, _, err = l.mux.Negotiate(maconn)
if err != nil { if err != nil {
log.Info("negotiation of crypto protocol failed: ", err) log.Info("incoming conn: negotiation of crypto protocol failed: ", err)
maconn.Close() maconn.Close()
return return
} }
...@@ -178,7 +177,7 @@ func (l *listener) handleIncoming() { ...@@ -178,7 +177,7 @@ func (l *listener) handleIncoming() {
// clear read readline // clear read readline
maconn.SetReadDeadline(time.Time{}) maconn.SetReadDeadline(time.Time{})
l.incoming <- maconn l.incoming <- connErr{conn: maconn}
}() }()
} }
} }
...@@ -189,10 +188,24 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee ...@@ -189,10 +188,24 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee
local: local, local: local,
privk: sk, privk: sk,
mux: msmux.NewMultistreamMuxer(), mux: msmux.NewMultistreamMuxer(),
incoming: make(chan transport.Conn, connAcceptBuffer), incoming: make(chan connErr, connAcceptBuffer),
ctx: ctx, ctx: ctx,
} }
l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown) l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown)
l.catcher.IsTemp = func(e error) bool {
// ignore connection breakages up to this point. but log them
if e == io.EOF {
log.Debugf("listener ignoring conn with EOF: %s", e)
return true
}
te, ok := e.(tec.Temporary)
if ok {
log.Debugf("listener ignoring conn with temporary err: %s", e)
return te.Temporary()
}
return false
}
if EncryptConnections { if EncryptConnections {
l.mux.AddHandler(SecioTag, nil) l.mux.AddHandler(SecioTag, nil)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment