Commit 85a86c83 authored by Marten Seemann's avatar Marten Seemann
Browse files

implement ping/2.0.0, which uses a new stream for every ping

parent 1f1aca1c
...@@ -12,67 +12,59 @@ import ( ...@@ -12,67 +12,59 @@ import (
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
multistream "github.com/multiformats/go-multistream"
) )
var log = logging.Logger("ping") var log = logging.Logger("ping")
// PingSize is the size of ping in bytes
const PingSize = 32 const PingSize = 32
const ID = "/ipfs/ping/1.0.0" // ID is the protocol ID of the ping/2.0.0 protocol
// This version uses a new stream for every ping.
const ID = "/ipfs/ping/2.0.0"
const pingTimeout = time.Second * 60 // IDLegacy is the protocol ID of the ping/1.0.0 protocol
// This version uses a single stream for all pings.
const IDLegacy = "/ipfs/ping/1.0.0"
type PingService struct { type PingService struct {
Host host.Host Host host.Host
} }
// NewPingService creates a new PingService.
// It uses supports both ping/1.0.0 and ping/2.0.0 for incoming pings
func NewPingService(h host.Host) *PingService { func NewPingService(h host.Host) *PingService {
ps := &PingService{h} ps := &PingService{h}
h.SetStreamHandler(IDLegacy, ps.PingHandlerLegacy)
h.SetStreamHandler(ID, ps.PingHandler) h.SetStreamHandler(ID, ps.PingHandler)
return ps return ps
} }
func (p *PingService) PingHandler(s inet.Stream) { // PingHandler handles a ping/2.0.0 ping.
buf := make([]byte, PingSize) func (ps *PingService) PingHandler(s inet.Stream) {
defer s.Close()
errCh := make(chan error, 1)
defer close(errCh)
timer := time.NewTimer(pingTimeout)
defer timer.Stop()
go func() { buf := make([]byte, PingSize)
select { if _, err := io.ReadFull(s, buf); err != nil {
case <-timer.C:
log.Debug("ping timeout")
case err, ok := <-errCh:
if ok {
log.Debug(err) log.Debug(err)
} else {
log.Error("ping loop failed without error")
}
}
s.Reset()
}()
for {
_, err := io.ReadFull(s, buf)
if err != nil {
errCh <- err
return return
} }
_, err = s.Write(buf) if _, err := s.Write(buf); err != nil {
if err != nil { log.Debug(err)
errCh <- err
return return
} }
timer.Reset(pingTimeout)
}
} }
// Ping pings a peer.
// It first attempts to use ping/2.0.0, and falls back to ping/1.0.0
// if the peer doesn't support it yet.
func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) { func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) {
s, err := ps.Host.NewStream(ctx, p, ID) s, err := ps.Host.NewStream(ctx, p, ID)
if err == multistream.ErrNotSupported {
return ps.PingLegacy(ctx, p)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -80,12 +72,13 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio ...@@ -80,12 +72,13 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio
out := make(chan time.Duration) out := make(chan time.Duration)
go func() { go func() {
defer close(out) defer close(out)
defer s.Reset()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: default:
}
t, err := ping(s) t, err := ping(s)
if err != nil { if err != nil {
log.Debugf("ping error: %s", err) log.Debugf("ping error: %s", err)
...@@ -95,9 +88,14 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio ...@@ -95,9 +88,14 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio
ps.Host.Peerstore().RecordLatency(p, t) ps.Host.Peerstore().RecordLatency(p, t)
select { select {
case out <- t: case out <- t:
case <-ctx.Done(): s, err = ps.Host.NewStream(ctx, p, ID)
if err != nil {
log.Debugf("ping error: %s", err)
return return
} }
defer s.Close()
case <-ctx.Done():
return
} }
} }
}() }()
...@@ -106,23 +104,23 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio ...@@ -106,23 +104,23 @@ func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duratio
} }
func ping(s inet.Stream) (time.Duration, error) { func ping(s inet.Stream) (time.Duration, error) {
defer s.Close()
buf := make([]byte, PingSize) buf := make([]byte, PingSize)
u.NewTimeSeededRand().Read(buf) u.NewTimeSeededRand().Read(buf)
before := time.Now() before := time.Now()
_, err := s.Write(buf) if _, err := s.Write(buf); err != nil {
if err != nil {
return 0, err return 0, err
} }
rbuf := make([]byte, PingSize) rbuf := make([]byte, PingSize)
_, err = io.ReadFull(s, rbuf) if _, err := io.ReadFull(s, rbuf); err != nil {
if err != nil {
return 0, err return 0, err
} }
if !bytes.Equal(buf, rbuf) { if !bytes.Equal(buf, rbuf) {
return 0, errors.New("ping packet was incorrect!") return 0, errors.New("ping packet was incorrect")
} }
return time.Since(before), nil return time.Since(before), nil
......
package ping
import (
"bytes"
"context"
"errors"
"io"
"time"
u "github.com/ipfs/go-ipfs-util"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
)
const pingTimeout = 60 * time.Second
// PingHandlerLegacy handles a ping/1.0.0 ping.
func (ps *PingService) PingHandlerLegacy(s inet.Stream) {
buf := make([]byte, PingSize)
errCh := make(chan error, 1)
defer close(errCh)
timer := time.NewTimer(pingTimeout)
defer timer.Stop()
go func() {
select {
case <-timer.C:
log.Debug("ping timeout")
case err, ok := <-errCh:
if ok {
log.Debug(err)
} else {
log.Error("ping loop failed without error")
}
}
s.Reset()
}()
for {
_, err := io.ReadFull(s, buf)
if err != nil {
errCh <- err
return
}
_, err = s.Write(buf)
if err != nil {
errCh <- err
return
}
timer.Reset(pingTimeout)
}
}
// PingLegacy pings a peer using ping/1.0.0
func (ps *PingService) PingLegacy(ctx context.Context, p peer.ID) (<-chan time.Duration, error) {
s, err := ps.Host.NewStream(ctx, p, IDLegacy)
if err != nil {
return nil, err
}
out := make(chan time.Duration)
go func() {
defer close(out)
defer s.Reset()
for {
select {
case <-ctx.Done():
return
default:
t, err := pingLegacy(s)
if err != nil {
log.Debugf("ping error: %s", err)
return
}
ps.Host.Peerstore().RecordLatency(p, t)
select {
case out <- t:
case <-ctx.Done():
return
}
}
}
}()
return out, nil
}
func pingLegacy(s inet.Stream) (time.Duration, error) {
buf := make([]byte, PingSize)
u.NewTimeSeededRand().Read(buf)
before := time.Now()
_, err := s.Write(buf)
if err != nil {
return 0, err
}
rbuf := make([]byte, PingSize)
_, err = io.ReadFull(s, rbuf)
if err != nil {
return 0, err
}
if !bytes.Equal(buf, rbuf) {
return 0, errors.New("ping packet was incorrect!")
}
return time.Since(before), nil
}
...@@ -33,6 +33,28 @@ func TestPing(t *testing.T) { ...@@ -33,6 +33,28 @@ func TestPing(t *testing.T) {
testPing(t, ps2, h1.ID()) testPing(t, ps2, h1.ID())
} }
func TestPingLegacyFallback(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h1 := bhost.New(swarmt.GenSwarm(t, ctx))
h2 := bhost.New(swarmt.GenSwarm(t, ctx))
err := h1.Connect(ctx, pstore.PeerInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
if err != nil {
t.Fatal(err)
}
ps1 := NewPingService(h1)
NewPingService(h2)
h2.RemoveStreamHandler(ID)
testPing(t, ps1, h2.ID())
}
func testPing(t *testing.T, ps *PingService, p peer.ID) { func testPing(t *testing.T, ps *PingService, p peer.ID) {
pctx, cancel := context.WithCancel(context.Background()) pctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
...@@ -43,8 +65,11 @@ func testPing(t *testing.T, ps *PingService, p peer.ID) { ...@@ -43,8 +65,11 @@ func testPing(t *testing.T, ps *PingService, p peer.ID) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
select { select {
case took := <-ts: case took, ok := <-ts:
t.Log("ping took: ", took) if !ok {
t.Fatal("ping failed")
}
t.Logf("ping took: %s", took)
case <-time.After(time.Second * 4): case <-time.After(time.Second * 4):
t.Fatal("failed to receive ping") t.Fatal("failed to receive ping")
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment