diff --git a/p2p/net/mock/mock_conn.go b/p2p/net/mock/mock_conn.go index 68e9c840cc455fde1798d91cd5f5f201fecde0d2..f4cf10358b6e8765fe3caad9e459dbf1bf2df047 100644 --- a/p2p/net/mock/mock_conn.go +++ b/p2p/net/mock/mock_conn.go @@ -29,14 +29,16 @@ type conn struct { rconn *conn // counterpart streams list.List proc process.Process + stat inet.Stat sync.RWMutex } -func newConn(ln, rn *peernet, l *link) *conn { +func newConn(ln, rn *peernet, l *link, dir inet.Direction) *conn { c := &conn{net: ln, link: l} c.local = ln.peer c.remote = rn.peer + c.stat = inet.Stat{Direction: dir} c.localAddr = ln.ps.Addrs(ln.peer)[0] c.remoteAddr = rn.ps.Addrs(rn.peer)[0] @@ -155,3 +157,8 @@ func (c *conn) RemotePeer() peer.ID { func (c *conn) RemotePublicKey() ic.PubKey { return c.remotePubKey } + +// Stat returns metadata about the connection +func (c *conn) Stat() inet.Stat { + return c.stat +} diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index d4d605310d78b29f820f45744540416930894262..9c6049ac88f2785851051412824ae0262c662b39 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -33,8 +33,8 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { l.RLock() defer l.RUnlock() - c1 := newConn(l.nets[0], l.nets[1], l) - c2 := newConn(l.nets[1], l.nets[0], l) + c1 := newConn(l.nets[0], l.nets[1], l, inet.DirOutbound) + c2 := newConn(l.nets[1], l.nets[0], l, inet.DirInbound) c1.rconn = c2 c2.rconn = c1 @@ -48,8 +48,8 @@ func (l *link) newStreamPair() (*stream, *stream) { ra, wb := io.Pipe() rb, wa := io.Pipe() - sa := NewStream(wa, ra) - sb := NewStream(wb, rb) + sa := NewStream(wa, ra, inet.DirOutbound) + sb := NewStream(wb, rb, inet.DirInbound) return sa, sb } diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index 6d056c8ea08a492f1f6e346fb7e80855c8870a20..4f0c395fec694baeea0137eea74351df3f240206 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -25,6 +25,7 @@ type stream struct { writeErr error protocol protocol.ID + stat inet.Stat } var ErrReset error = errors.New("stream reset") @@ -35,7 +36,7 @@ type transportObject struct { arrivalTime time.Time } -func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream { +func NewStream(w *io.PipeWriter, r *io.PipeReader, dir inet.Direction) *stream { s := &stream{ read: r, write: w, @@ -43,6 +44,7 @@ func NewStream(w *io.PipeWriter, r *io.PipeReader) *stream { close: make(chan struct{}, 1), closed: make(chan struct{}), toDeliver: make(chan *transportObject), + stat: inet.Stat{Direction: dir}, } go s.transport() @@ -66,6 +68,10 @@ func (s *stream) Protocol() protocol.ID { return s.protocol } +func (s *stream) Stat() inet.Stat { + return s.stat +} + func (s *stream) SetProtocol(proto protocol.ID) { s.protocol = proto }