echo.go 4.41 KB
Newer Older
1
2
3
4
5
6
7
8
package main

import (
	"bufio"
	"context"
	"fmt"
	"log"

Steven Allen's avatar
Steven Allen committed
9
	inet "github.com/libp2p/go-libp2p-net"
10

Steven Allen's avatar
Steven Allen committed
11
	"github.com/libp2p/go-libp2p-host"
Steven Allen's avatar
Steven Allen committed
12
	p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
13
	protobufCodec "github.com/multiformats/go-multicodec/protobuf"
14
	uuid "github.com/satori/go.uuid"
15
16
17
18
19
20
21
)

// pattern: /protocol-name/request-or-response-message/version
const echoRequest = "/echo/echoreq/0.0.1"
const echoResponse = "/echo/echoresp/0.0.1"

type EchoProtocol struct {
Aviv Eyal's avatar
Aviv Eyal committed
22
	node     *Node                       // local host
Aviv Eyal's avatar
Aviv Eyal committed
23
	requests map[string]*p2p.EchoRequest // used to access request data from response handlers
24
25
26
	done     chan bool                   // only for demo purposes to hold main from terminating
}

Aviv Eyal's avatar
Aviv Eyal committed
27
28
29
30
func NewEchoProtocol(node *Node, done chan bool) *EchoProtocol {
	e := EchoProtocol{node: node, requests: make(map[string]*p2p.EchoRequest), done: done}
	node.SetStreamHandler(echoRequest, e.onEchoRequest)
	node.SetStreamHandler(echoResponse, e.onEchoResponse)
Aviv Eyal's avatar
Aviv Eyal committed
31
32
33
34

	// design note: to implement fire-and-forget style messages you may just skip specifying a response callback.
	// a fire-and-forget message will just include a request and not specify a response object

35
36
37
38
	return &e
}

// remote peer requests handler
39
func (e *EchoProtocol) onEchoRequest(s inet.Stream) {
40
41
42
43
44
	// get request data
	data := &p2p.EchoRequest{}
	decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
	err := decoder.Decode(data)
	if err != nil {
Aviv Eyal's avatar
Aviv Eyal committed
45
		log.Println(err)
46
47
48
49
50
		return
	}

	log.Printf("%s: Received echo request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message)

51
52
53
	valid := e.node.authenticateMessage(data, data.MessageData)

	if !valid {
Aviv Eyal's avatar
Aviv Eyal committed
54
		log.Println("Failed to authenticate message")
55
56
57
		return
	}

58
	log.Printf("%s: Sending echo response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id)
Aviv Eyal's avatar
Aviv Eyal committed
59

60
	// send response to the request using the message string he provided
Aviv Eyal's avatar
Aviv Eyal committed
61

62
	resp := &p2p.EchoResponse{
Aviv Eyal's avatar
Aviv Eyal committed
63
		MessageData: e.node.NewMessageData(data.MessageData.Id, false),
64
65
		Message:     data.Message}

Aviv Eyal's avatar
Aviv Eyal committed
66
67
68
	// sign the data
	signature, err := e.node.signProtoMessage(resp)
	if err != nil {
Aviv Eyal's avatar
Aviv Eyal committed
69
		log.Println("failed to sign response")
Aviv Eyal's avatar
Aviv Eyal committed
70
71
72
73
74
75
		return
	}

	// add the signature to the message
	resp.MessageData.Sign = string(signature)

Aviv Eyal's avatar
Aviv Eyal committed
76
	s, respErr := e.node.NewStream(context.Background(), s.Conn().RemotePeer(), echoResponse)
77
	if respErr != nil {
Aviv Eyal's avatar
Aviv Eyal committed
78
		log.Println(respErr)
79
80
81
		return
	}

Aviv Eyal's avatar
Aviv Eyal committed
82
	ok := e.node.sendProtoMessage(resp, s)
83
84
85
86
87
88
89

	if ok {
		log.Printf("%s: Echo response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())
	}
}

// remote echo response handler
90
func (e *EchoProtocol) onEchoResponse(s inet.Stream) {
91
92
93
94
95
96
97
	data := &p2p.EchoResponse{}
	decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
	err := decoder.Decode(data)
	if err != nil {
		return
	}

98
99
100
101
	// authenticate message content
	valid := e.node.authenticateMessage(data, data.MessageData)

	if !valid {
Aviv Eyal's avatar
Aviv Eyal committed
102
		log.Println("Failed to authenticate message")
103
104
105
		return
	}

106
107
108
109
110
111
	// locate request data and remove it if found
	req, ok := e.requests[data.MessageData.Id]
	if ok {
		// remove request from map as we have processed it here
		delete(e.requests, data.MessageData.Id)
	} else {
Aviv Eyal's avatar
Aviv Eyal committed
112
		log.Println("Failed to locate request data boject for response")
113
114
115
		return
	}

116
117
118
	if req.Message != data.Message {
		log.Fatalln("Expected echo to respond with request message")
	}
119

Aviv Eyal's avatar
Aviv Eyal committed
120
	log.Printf("%s: Received echo response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message)
121
122
123
	e.done <- true
}

124
func (e *EchoProtocol) Echo(host host.Host) bool {
Aviv Eyal's avatar
Aviv Eyal committed
125
	log.Printf("%s: Sending echo to: %s....", e.node.ID(), host.ID())
126
127
128

	// create message data
	req := &p2p.EchoRequest{
129
		MessageData: e.node.NewMessageData(uuid.Must(uuid.NewV4()).String(), false),
Aviv Eyal's avatar
Aviv Eyal committed
130
		Message:     fmt.Sprintf("Echo from %s", e.node.ID())}
131

Aviv Eyal's avatar
Aviv Eyal committed
132
133
	signature, err := e.node.signProtoMessage(req)
	if err != nil {
Aviv Eyal's avatar
Aviv Eyal committed
134
		log.Println("failed to sign message")
Aviv Eyal's avatar
Aviv Eyal committed
135
136
137
138
139
140
141
		return false
	}

	// add the signature to the message
	req.MessageData.Sign = string(signature)

	s, err := e.node.NewStream(context.Background(), host.ID(), echoRequest)
142
	if err != nil {
Aviv Eyal's avatar
Aviv Eyal committed
143
		log.Println(err)
144
145
146
		return false
	}

Aviv Eyal's avatar
Aviv Eyal committed
147
	ok := e.node.sendProtoMessage(req, s)
148
149
150
151
152
153
154

	if !ok {
		return false
	}

	// store request so response handler has access to it
	e.requests[req.MessageData.Id] = req
Aviv Eyal's avatar
Aviv Eyal committed
155
	log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.node.ID(), host.ID(), req.MessageData.Id, req.Message)
156
157
	return true
}