Commit 1c10ed41 authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub
Browse files

Merge pull request #148 from libp2p/feat/stream-deadlines

update dependencies and add deadline methods to streams
parents bb6a43ca 6f804db7
...@@ -21,7 +21,7 @@ import ( ...@@ -21,7 +21,7 @@ import (
msmux "github.com/whyrusleeping/go-multistream" msmux "github.com/whyrusleeping/go-multistream"
) )
var log = logging.Logger("github.com/libp2p/go-libp2p/p2p/host/basic") var log = logging.Logger("basichost")
// Option is a type used to pass in options to the host. // Option is a type used to pass in options to the host.
type Option int type Option int
......
...@@ -2,7 +2,7 @@ package mocknet ...@@ -2,7 +2,7 @@ package mocknet
import ( import (
// "fmt" // "fmt"
"io" "net"
"sync" "sync"
"time" "time"
...@@ -45,11 +45,10 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { ...@@ -45,11 +45,10 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
} }
func (l *link) newStreamPair() (*stream, *stream) { func (l *link) newStreamPair() (*stream, *stream) {
r1, w1 := io.Pipe() a, b := net.Pipe()
r2, w2 := io.Pipe()
s1 := NewStream(w2, r1) s1 := NewStream(a)
s2 := NewStream(w1, r2) s2 := NewStream(b)
return s1, s2 return s1, s2
} }
......
...@@ -3,6 +3,7 @@ package mocknet ...@@ -3,6 +3,7 @@ package mocknet
import ( import (
"bytes" "bytes"
"io" "io"
"net"
"time" "time"
process "github.com/jbenet/goprocess" process "github.com/jbenet/goprocess"
...@@ -12,8 +13,7 @@ import ( ...@@ -12,8 +13,7 @@ import (
// stream implements inet.Stream // stream implements inet.Stream
type stream struct { type stream struct {
io.Reader Pipe net.Conn
io.Writer
conn *conn conn *conn
toDeliver chan *transportObject toDeliver chan *transportObject
proc process.Process proc process.Process
...@@ -26,10 +26,9 @@ type transportObject struct { ...@@ -26,10 +26,9 @@ type transportObject struct {
arrivalTime time.Time arrivalTime time.Time
} }
func NewStream(w io.Writer, r io.Reader) *stream { func NewStream(p net.Conn) *stream {
s := &stream{ s := &stream{
Reader: r, Pipe: p,
Writer: w,
toDeliver: make(chan *transportObject), toDeliver: make(chan *transportObject),
} }
...@@ -70,12 +69,7 @@ func (s *stream) teardown() error { ...@@ -70,12 +69,7 @@ func (s *stream) teardown() error {
// at this point, no streams are writing. // at this point, no streams are writing.
s.conn.removeStream(s) s.conn.removeStream(s)
if r, ok := (s.Reader).(io.Closer); ok { s.Pipe.Close()
r.Close()
}
if w, ok := (s.Writer).(io.Closer); ok {
w.Close()
}
s.conn.net.notifyAll(func(n inet.Notifiee) { s.conn.net.notifyAll(func(n inet.Notifiee) {
n.ClosedStream(s.conn.net, s) n.ClosedStream(s.conn.net, s)
}) })
...@@ -86,6 +80,22 @@ func (s *stream) Conn() inet.Conn { ...@@ -86,6 +80,22 @@ func (s *stream) Conn() inet.Conn {
return s.conn return s.conn
} }
func (s *stream) SetDeadline(t time.Time) error {
return s.Pipe.SetDeadline(t)
}
func (s *stream) SetReadDeadline(t time.Time) error {
return s.Pipe.SetReadDeadline(t)
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return s.Pipe.SetWriteDeadline(t)
}
func (s *stream) Read(b []byte) (int, error) {
return s.Pipe.Read(b)
}
// transport will grab message arrival times, wait until that time, and // transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive // then write the message out when it is scheduled to arrive
func (s *stream) transport(proc process.Process) { func (s *stream) transport(proc process.Process) {
...@@ -97,7 +107,7 @@ func (s *stream) transport(proc process.Process) { ...@@ -97,7 +107,7 @@ func (s *stream) transport(proc process.Process) {
// done only when arrival time makes sense. // done only when arrival time makes sense.
drainBuf := func() { drainBuf := func() {
if buf.Len() > 0 { if buf.Len() > 0 {
_, err := s.Writer.Write(buf.Bytes()) _, err := s.Pipe.Write(buf.Bytes())
if err != nil { if err != nil {
return return
} }
...@@ -131,7 +141,7 @@ func (s *stream) transport(proc process.Process) { ...@@ -131,7 +141,7 @@ func (s *stream) transport(proc process.Process) {
drainBuf() drainBuf()
// write this message. // write this message.
_, err := s.Writer.Write(o.msg) _, err := s.Pipe.Write(o.msg)
if err != nil { if err != nil {
log.Error("mock_stream", err) log.Error("mock_stream", err)
} }
......
...@@ -98,11 +98,6 @@ ...@@ -98,11 +98,6 @@
"name": "randbo", "name": "randbo",
"version": "0.0.0" "version": "0.0.0"
}, },
{
"hash": "Qmb1US8uyZeEpMyc56wVZy2cDFdQjNFojAUYVCoo9ieTqp",
"name": "go-stream-muxer",
"version": "1.0.0"
},
{ {
"hash": "QmeQW4ayVqi7Jjay1SrP2wYydsH9KwSrzQBnqyC25gPFnG", "hash": "QmeQW4ayVqi7Jjay1SrP2wYydsH9KwSrzQBnqyC25gPFnG",
"name": "go-notifier", "name": "go-notifier",
...@@ -114,9 +109,9 @@ ...@@ -114,9 +109,9 @@
"version": "0.0.0" "version": "0.0.0"
}, },
{ {
"hash": "QmS9en3mcwW2HRSeRabceJEGVxTZF4vEeFm7JHWQwWsb1U", "hash": "QmVwFjMdejJ8mGVmgyR2mKcUHrvNBDtDsKRT99soVbkFhA",
"name": "go-peerstream", "name": "go-peerstream",
"version": "1.4.1" "version": "1.5.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -142,24 +137,6 @@ ...@@ -142,24 +137,6 @@
"name": "go-libp2p-secio", "name": "go-libp2p-secio",
"version": "1.1.0" "version": "1.1.0"
}, },
{
"author": "whyrusleeping",
"hash": "QmSHTSkxXGQgaHWz91oZV3CDy3hmKmDgpjbYRT6niACG4E",
"name": "go-smux-yamux",
"version": "1.1.1"
},
{
"author": "whyrusleeping",
"hash": "QmetupZ62uEdoqNsbZUCgqU3JyfssExBfqBwBhDpjyE6eW",
"name": "go-smux-multistream",
"version": "1.4.0"
},
{
"author": "whyrusleeping",
"hash": "QmfXgTygwsTPyUWPWTAeBK6cFtTdMqmeeqhyhcNMhRpT1g",
"name": "go-smux-spdystream",
"version": "1.1.1"
},
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo", "hash": "QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo",
...@@ -216,15 +193,15 @@ ...@@ -216,15 +193,15 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmdysBu77i3YaagNtMAjiCJdeWWvds18ho5XEB784guQ41", "hash": "QmU3pGGVT1riXp5dBJbNrGpxssVScfvk9236drRHZZbKJ1",
"name": "go-libp2p-net", "name": "go-libp2p-net",
"version": "1.5.0" "version": "1.6.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmVcNzHewFvmVah1CGqg8NV7nHHsPu19U43YE5b2oqWyBp", "hash": "QmX4j1JhubdEt4EB1JY1mMKTvJwPZSRzTv3uwh5zaDqyAi",
"name": "go-libp2p-metrics", "name": "go-libp2p-metrics",
"version": "1.5.0" "version": "1.6.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
...@@ -234,15 +211,15 @@ ...@@ -234,15 +211,15 @@
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmWf338UyG5DKyemvoFiomDPtkVNHLsw3GAt9XXHX5ZtsM", "hash": "QmU5qKZsCG1Wg38jwg8XezBdc3fBGMMZjM7YFMAhunC1Yh",
"name": "go-libp2p-host", "name": "go-libp2p-host",
"version": "1.1.1" "version": "1.2.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmcjMKTqrWgMMCExEnwczefhno5fvx7FHDV63peZwDzHNF", "hash": "QmU9ePpXRQgGpPpMAm1CsgU9KptrtgZERrVBGB7Ek5cM2D",
"name": "go-libp2p-swarm", "name": "go-libp2p-swarm",
"version": "1.3.3" "version": "1.4.0"
}, },
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment