From 85a86c83d3437009e03496aa346d75d158999ac3 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 30 Aug 2018 18:53:28 +0700 Subject: [PATCH] implement ping/2.0.0, which uses a new stream for every ping --- p2p/protocol/ping/ping.go | 98 +++++++++++++-------------- p2p/protocol/ping/ping_legacy.go | 113 +++++++++++++++++++++++++++++++ p2p/protocol/ping/ping_test.go | 29 +++++++- 3 files changed, 188 insertions(+), 52 deletions(-) create mode 100644 p2p/protocol/ping/ping_legacy.go diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index ae0c47f..992c041 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -12,67 +12,59 @@ import ( host "github.com/libp2p/go-libp2p-host" inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" + multistream "github.com/multiformats/go-multistream" ) var log = logging.Logger("ping") +// PingSize is the size of ping in bytes const PingSize = 32 -const ID = "/ipfs/ping/1.0.0" +// ID is the protocol ID of the ping/2.0.0 protocol +// This version uses a new stream for every ping. +const ID = "/ipfs/ping/2.0.0" -const pingTimeout = time.Second * 60 +// IDLegacy is the protocol ID of the ping/1.0.0 protocol +// This version uses a single stream for all pings. +const IDLegacy = "/ipfs/ping/1.0.0" type PingService struct { Host host.Host } +// NewPingService creates a new PingService. +// It uses supports both ping/1.0.0 and ping/2.0.0 for incoming pings func NewPingService(h host.Host) *PingService { ps := &PingService{h} + h.SetStreamHandler(IDLegacy, ps.PingHandlerLegacy) h.SetStreamHandler(ID, ps.PingHandler) return ps } -func (p *PingService) PingHandler(s inet.Stream) { - buf := make([]byte, PingSize) - - errCh := make(chan error, 1) - defer close(errCh) - timer := time.NewTimer(pingTimeout) - defer timer.Stop() - - go func() { - select { - case <-timer.C: - log.Debug("ping timeout") - case err, ok := <-errCh: - if ok { - log.Debug(err) - } else { - log.Error("ping loop failed without error") - } - } - s.Reset() - }() +// PingHandler handles a ping/2.0.0 ping. +func (ps *PingService) PingHandler(s inet.Stream) { + defer s.Close() - for { - _, err := io.ReadFull(s, buf) - if err != nil { - errCh <- err - return - } - - _, err = s.Write(buf) - if err != nil { - errCh <- err - return - } + buf := make([]byte, PingSize) + if _, err := io.ReadFull(s, buf); err != nil { + log.Debug(err) + return + } - timer.Reset(pingTimeout) + if _, err := s.Write(buf); err != nil { + log.Debug(err) + return } } +// Ping pings a peer. +// It first attempts to use ping/2.0.0, and falls back to ping/1.0.0 +// if the peer doesn't support it yet. func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) { s, err := ps.Host.NewStream(ctx, p, ID) + if err == multistream.ErrNotSupported { + return ps.PingLegacy(ctx, p) + } if err != nil { return nil, err } @@ -80,24 +72,30 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio out := make(chan time.Duration) go func() { defer close(out) - defer s.Reset() for { select { case <-ctx.Done(): return default: - t, err := ping(s) + } + + t, err := ping(s) + if err != nil { + log.Debugf("ping error: %s", err) + return + } + + ps.Host.Peerstore().RecordLatency(p, t) + select { + case out <- t: + s, err = ps.Host.NewStream(ctx, p, ID) if err != nil { log.Debugf("ping error: %s", err) return } - - ps.Host.Peerstore().RecordLatency(p, t) - select { - case out <- t: - case <-ctx.Done(): - return - } + defer s.Close() + case <-ctx.Done(): + return } } }() @@ -106,23 +104,23 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio } func ping(s inet.Stream) (time.Duration, error) { + defer s.Close() + buf := make([]byte, PingSize) u.NewTimeSeededRand().Read(buf) before := time.Now() - _, err := s.Write(buf) - if err != nil { + if _, err := s.Write(buf); err != nil { return 0, err } rbuf := make([]byte, PingSize) - _, err = io.ReadFull(s, rbuf) - if err != nil { + if _, err := io.ReadFull(s, rbuf); err != nil { return 0, err } if !bytes.Equal(buf, rbuf) { - return 0, errors.New("ping packet was incorrect!") + return 0, errors.New("ping packet was incorrect") } return time.Since(before), nil diff --git a/p2p/protocol/ping/ping_legacy.go b/p2p/protocol/ping/ping_legacy.go new file mode 100644 index 0000000..7cff819 --- /dev/null +++ b/p2p/protocol/ping/ping_legacy.go @@ -0,0 +1,113 @@ +package ping + +import ( + "bytes" + "context" + "errors" + "io" + "time" + + u "github.com/ipfs/go-ipfs-util" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" +) + +const pingTimeout = 60 * time.Second + +// PingHandlerLegacy handles a ping/1.0.0 ping. +func (ps *PingService) PingHandlerLegacy(s inet.Stream) { + buf := make([]byte, PingSize) + + errCh := make(chan error, 1) + defer close(errCh) + timer := time.NewTimer(pingTimeout) + defer timer.Stop() + + go func() { + select { + case <-timer.C: + log.Debug("ping timeout") + case err, ok := <-errCh: + if ok { + log.Debug(err) + } else { + log.Error("ping loop failed without error") + } + } + s.Reset() + }() + + for { + _, err := io.ReadFull(s, buf) + if err != nil { + errCh <- err + return + } + + _, err = s.Write(buf) + if err != nil { + errCh <- err + return + } + + timer.Reset(pingTimeout) + } +} + +// PingLegacy pings a peer using ping/1.0.0 +func (ps *PingService) PingLegacy(ctx context.Context, p peer.ID) (<-chan time.Duration, error) { + s, err := ps.Host.NewStream(ctx, p, IDLegacy) + if err != nil { + return nil, err + } + + out := make(chan time.Duration) + go func() { + defer close(out) + defer s.Reset() + for { + select { + case <-ctx.Done(): + return + default: + t, err := pingLegacy(s) + if err != nil { + log.Debugf("ping error: %s", err) + return + } + + ps.Host.Peerstore().RecordLatency(p, t) + select { + case out <- t: + case <-ctx.Done(): + return + } + } + } + }() + + return out, nil +} + +func pingLegacy(s inet.Stream) (time.Duration, error) { + buf := make([]byte, PingSize) + u.NewTimeSeededRand().Read(buf) + + before := time.Now() + _, err := s.Write(buf) + if err != nil { + return 0, err + } + + rbuf := make([]byte, PingSize) + _, err = io.ReadFull(s, rbuf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf, rbuf) { + return 0, errors.New("ping packet was incorrect!") + } + + return time.Since(before), nil +} diff --git a/p2p/protocol/ping/ping_test.go b/p2p/protocol/ping/ping_test.go index adcb3ca..d1d6866 100644 --- a/p2p/protocol/ping/ping_test.go +++ b/p2p/protocol/ping/ping_test.go @@ -33,6 +33,28 @@ func TestPing(t *testing.T) { testPing(t, ps2, h1.ID()) } +func TestPingLegacyFallback(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + h1 := bhost.New(swarmt.GenSwarm(t, ctx)) + h2 := bhost.New(swarmt.GenSwarm(t, ctx)) + + err := h1.Connect(ctx, pstore.PeerInfo{ + ID: h2.ID(), + Addrs: h2.Addrs(), + }) + + if err != nil { + t.Fatal(err) + } + + ps1 := NewPingService(h1) + NewPingService(h2) + h2.RemoveStreamHandler(ID) + + testPing(t, ps1, h2.ID()) +} + func testPing(t *testing.T, ps *PingService, p peer.ID) { pctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -43,8 +65,11 @@ func testPing(t *testing.T, ps *PingService, p peer.ID) { for i := 0; i < 5; i++ { select { - case took := <-ts: - t.Log("ping took: ", took) + case took, ok := <-ts: + if !ok { + t.Fatal("ping failed") + } + t.Logf("ping took: %s", took) case <-time.After(time.Second * 4): t.Fatal("failed to receive ping") } -- GitLab