echo.go 3.58 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main

import (
	"bufio"
	"context"
	"fmt"
	"log"

	host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
	inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net"

	uuid "github.com/google/uuid"
	p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
	protobufCodec "github.com/multiformats/go-multicodec/protobuf"
	"github.com/ipfs/go-ipfs/thirdparty/assert"
)

// pattern: /protocol-name/request-or-response-message/version
const echoRequest = "/echo/echoreq/0.0.1"
const echoResponse = "/echo/echoresp/0.0.1"

type EchoProtocol struct {
	host     host.Host                   // local host
	requests map[string] *p2p.EchoRequest // used to access request data from response handlers
	done     chan bool                   // only for demo purposes to hold main from terminating
}

func NewEchoProtocol(host host.Host, done chan bool) *EchoProtocol {
	e := EchoProtocol{host: host, requests: make(map[string]*p2p.EchoRequest), done: done}
	host.SetStreamHandler(echoRequest, e.onEchoRequest)
	host.SetStreamHandler(echoResponse, e.onEchoResponse)
	return &e
}

// remote peer requests handler
func (e *EchoProtocol) onEchoRequest(s inet.Stream) {

	// get request data
	data := &p2p.EchoRequest{}
	decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
	err := decoder.Decode(data)
	if err != nil {
		log.Fatal(err)
		return
	}

	log.Printf("%s: Received echo request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message)

	// send response to sender
	log.Printf("%s: Sending echo response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id)
	resp := &p2p.EchoResponse{
		MessageData: NewMessageData(e.host.ID().String(), data.MessageData.Id, false),
		Message:     data.Message}

	s, respErr := e.host.NewStream(context.Background(), s.Conn().RemotePeer(), echoResponse)
	if respErr != nil {
		log.Fatal(respErr)
		return
	}

	ok := sendDataObject(resp, s)

	if ok {
		log.Printf("%s: Echo response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())
	}
}

// remote echo response handler
func (e *EchoProtocol) onEchoResponse(s inet.Stream) {
	data := &p2p.EchoResponse{}
	decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
	err := decoder.Decode(data)
	if err != nil {
		return
	}

	// locate request data and remove it if found
	req, ok := e.requests[data.MessageData.Id]
	if ok {
		// remove request from map as we have processed it here
		delete(e.requests, data.MessageData.Id)
	} else {
		log.Print("Failed to locate request data boject for response")
		return
	}

	assert.True(req.Message == data.Message, nil, "Expected echo to respond with request message")

	log.Printf("%s: Received Echo response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message)
	e.done <- true
}

func (e *EchoProtocol) Echo(node *Node) bool {
	log.Printf("%s: Sending echo to: %s....", e.host.ID(), node.host.ID())

	// create message data
	req := &p2p.EchoRequest{
		MessageData: NewMessageData(e.host.ID().String(), uuid.New().String(), false),
		Message:     fmt.Sprintf("Echo from %s", e.host.ID())}

	s, err := e.host.NewStream(context.Background(), node.host.ID(), echoRequest)
	if err != nil {
		log.Fatal(err)
		return false
	}

	ok := sendDataObject(req, s)

	if !ok {
		return false
	}

	// store request so response handler has access to it
	e.requests[req.MessageData.Id] = req
	log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.host.ID(), node.host.ID(), req.MessageData.Id, req.Message)
	return true
}