From 6f804db71e571eba3114c5604f32f33e24790119 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 8 Nov 2016 13:11:02 -0800 Subject: [PATCH] update dependencies and add deadline methods to streams --- p2p/host/basic/basic_host.go | 2 +- p2p/net/mock/mock_link.go | 9 ++++---- p2p/net/mock/mock_stream.go | 36 +++++++++++++++++++----------- package.json | 43 +++++++++--------------------------- 4 files changed, 38 insertions(+), 52 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index d0c1575..b450db1 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -21,7 +21,7 @@ import ( msmux "github.com/whyrusleeping/go-multistream" ) -var log = logging.Logger("github.com/libp2p/go-libp2p/p2p/host/basic") +var log = logging.Logger("basichost") // Option is a type used to pass in options to the host. type Option int diff --git a/p2p/net/mock/mock_link.go b/p2p/net/mock/mock_link.go index bcb7d9a..ae74da8 100644 --- a/p2p/net/mock/mock_link.go +++ b/p2p/net/mock/mock_link.go @@ -2,7 +2,7 @@ package mocknet import ( // "fmt" - "io" + "net" "sync" "time" @@ -45,11 +45,10 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { } func (l *link) newStreamPair() (*stream, *stream) { - r1, w1 := io.Pipe() - r2, w2 := io.Pipe() + a, b := net.Pipe() - s1 := NewStream(w2, r1) - s2 := NewStream(w1, r2) + s1 := NewStream(a) + s2 := NewStream(b) return s1, s2 } diff --git a/p2p/net/mock/mock_stream.go b/p2p/net/mock/mock_stream.go index bcdff3b..2529651 100644 --- a/p2p/net/mock/mock_stream.go +++ b/p2p/net/mock/mock_stream.go @@ -3,6 +3,7 @@ package mocknet import ( "bytes" "io" + "net" "time" process "github.com/jbenet/goprocess" @@ -12,8 +13,7 @@ import ( // stream implements inet.Stream type stream struct { - io.Reader - io.Writer + Pipe net.Conn conn *conn toDeliver chan *transportObject proc process.Process @@ -26,10 +26,9 @@ type transportObject struct { arrivalTime time.Time } -func NewStream(w io.Writer, r io.Reader) *stream { +func NewStream(p net.Conn) *stream { s := &stream{ - Reader: r, - Writer: w, + Pipe: p, toDeliver: make(chan *transportObject), } @@ -70,12 +69,7 @@ func (s *stream) teardown() error { // at this point, no streams are writing. s.conn.removeStream(s) - if r, ok := (s.Reader).(io.Closer); ok { - r.Close() - } - if w, ok := (s.Writer).(io.Closer); ok { - w.Close() - } + s.Pipe.Close() s.conn.net.notifyAll(func(n inet.Notifiee) { n.ClosedStream(s.conn.net, s) }) @@ -86,6 +80,22 @@ func (s *stream) Conn() inet.Conn { return s.conn } +func (s *stream) SetDeadline(t time.Time) error { + return s.Pipe.SetDeadline(t) +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return s.Pipe.SetReadDeadline(t) +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return s.Pipe.SetWriteDeadline(t) +} + +func (s *stream) Read(b []byte) (int, error) { + return s.Pipe.Read(b) +} + // transport will grab message arrival times, wait until that time, and // then write the message out when it is scheduled to arrive func (s *stream) transport(proc process.Process) { @@ -97,7 +107,7 @@ func (s *stream) transport(proc process.Process) { // done only when arrival time makes sense. drainBuf := func() { if buf.Len() > 0 { - _, err := s.Writer.Write(buf.Bytes()) + _, err := s.Pipe.Write(buf.Bytes()) if err != nil { return } @@ -131,7 +141,7 @@ func (s *stream) transport(proc process.Process) { drainBuf() // write this message. - _, err := s.Writer.Write(o.msg) + _, err := s.Pipe.Write(o.msg) if err != nil { log.Error("mock_stream", err) } diff --git a/package.json b/package.json index 9830f38..fd1da9c 100644 --- a/package.json +++ b/package.json @@ -98,11 +98,6 @@ "name": "randbo", "version": "0.0.0" }, - { - "hash": "Qmb1US8uyZeEpMyc56wVZy2cDFdQjNFojAUYVCoo9ieTqp", - "name": "go-stream-muxer", - "version": "1.0.0" - }, { "hash": "QmeQW4ayVqi7Jjay1SrP2wYydsH9KwSrzQBnqyC25gPFnG", "name": "go-notifier", @@ -114,9 +109,9 @@ "version": "0.0.0" }, { - "hash": "QmS9en3mcwW2HRSeRabceJEGVxTZF4vEeFm7JHWQwWsb1U", + "hash": "QmVwFjMdejJ8mGVmgyR2mKcUHrvNBDtDsKRT99soVbkFhA", "name": "go-peerstream", - "version": "1.4.1" + "version": "1.5.0" }, { "author": "whyrusleeping", @@ -142,24 +137,6 @@ "name": "go-libp2p-secio", "version": "1.1.0" }, - { - "author": "whyrusleeping", - "hash": "QmSHTSkxXGQgaHWz91oZV3CDy3hmKmDgpjbYRT6niACG4E", - "name": "go-smux-yamux", - "version": "1.1.1" - }, - { - "author": "whyrusleeping", - "hash": "QmetupZ62uEdoqNsbZUCgqU3JyfssExBfqBwBhDpjyE6eW", - "name": "go-smux-multistream", - "version": "1.4.0" - }, - { - "author": "whyrusleeping", - "hash": "QmfXgTygwsTPyUWPWTAeBK6cFtTdMqmeeqhyhcNMhRpT1g", - "name": "go-smux-spdystream", - "version": "1.1.1" - }, { "author": "whyrusleeping", "hash": "QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo", @@ -216,15 +193,15 @@ }, { "author": "whyrusleeping", - "hash": "QmdysBu77i3YaagNtMAjiCJdeWWvds18ho5XEB784guQ41", + "hash": "QmU3pGGVT1riXp5dBJbNrGpxssVScfvk9236drRHZZbKJ1", "name": "go-libp2p-net", - "version": "1.5.0" + "version": "1.6.0" }, { "author": "whyrusleeping", - "hash": "QmVcNzHewFvmVah1CGqg8NV7nHHsPu19U43YE5b2oqWyBp", + "hash": "QmX4j1JhubdEt4EB1JY1mMKTvJwPZSRzTv3uwh5zaDqyAi", "name": "go-libp2p-metrics", - "version": "1.5.0" + "version": "1.6.0" }, { "author": "whyrusleeping", @@ -234,15 +211,15 @@ }, { "author": "whyrusleeping", - "hash": "QmWf338UyG5DKyemvoFiomDPtkVNHLsw3GAt9XXHX5ZtsM", + "hash": "QmU5qKZsCG1Wg38jwg8XezBdc3fBGMMZjM7YFMAhunC1Yh", "name": "go-libp2p-host", - "version": "1.1.1" + "version": "1.2.0" }, { "author": "whyrusleeping", - "hash": "QmcjMKTqrWgMMCExEnwczefhno5fvx7FHDV63peZwDzHNF", + "hash": "QmU9ePpXRQgGpPpMAm1CsgU9KptrtgZERrVBGB7Ek5cM2D", "name": "go-libp2p-swarm", - "version": "1.3.3" + "version": "1.4.0" }, { "author": "whyrusleeping", -- GitLab