Unverified Commit d8ee5b3c authored by Steven Allen's avatar Steven Allen Committed by GitHub
Browse files

Merge pull request #381 from libp2p/feat/extract-examples

extract libp2p examples to go-libp2p-examples
parents d9712a3b 0ee6c5a2
# building p2p.pb.go:
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. *.proto
package main
import (
"bufio"
"context"
"fmt"
"log"
uuid "github.com/google/uuid"
"github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
protobufCodec "github.com/multiformats/go-multicodec/protobuf"
)
// pattern: /protocol-name/request-or-response-message/version
const pingRequest = "/ping/pingreq/0.0.1"
const pingResponse = "/ping/pingresp/0.0.1"
// PingProtocol type
type PingProtocol struct {
node *Node // local host
requests map[string]*p2p.PingRequest // used to access request data from response handlers
done chan bool // only for demo purposes to stop main from terminating
}
func NewPingProtocol(node *Node, done chan bool) *PingProtocol {
p := &PingProtocol{node: node, requests: make(map[string]*p2p.PingRequest), done: done}
node.SetStreamHandler(pingRequest, p.onPingRequest)
node.SetStreamHandler(pingResponse, p.onPingResponse)
return p
}
// remote peer requests handler
func (p *PingProtocol) onPingRequest(s inet.Stream) {
// get request data
data := &p2p.PingRequest{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data)
if err != nil {
log.Println(err)
return
}
log.Printf("%s: Received ping request from %s. Message: %s", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.Message)
valid := p.node.authenticateMessage(data, data.MessageData)
if !valid {
log.Println("Failed to authenticate message")
return
}
// generate response message
log.Printf("%s: Sending ping response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id)
resp := &p2p.PingResponse{MessageData: p.node.NewMessageData(data.MessageData.Id, false),
Message: fmt.Sprintf("Ping response from %s", p.node.ID())}
// sign the data
signature, err := p.node.signProtoMessage(resp)
if err != nil {
log.Println("failed to sign response")
return
}
// add the signature to the message
resp.MessageData.Sign = string(signature)
// send the response
s, respErr := p.node.NewStream(context.Background(), s.Conn().RemotePeer(), pingResponse)
if respErr != nil {
log.Println(respErr)
return
}
ok := p.node.sendProtoMessage(resp, s)
if ok {
log.Printf("%s: Ping response to %s sent.", s.Conn().LocalPeer().String(), s.Conn().RemotePeer().String())
}
}
// remote ping response handler
func (p *PingProtocol) onPingResponse(s inet.Stream) {
data := &p2p.PingResponse{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data)
if err != nil {
return
}
valid := p.node.authenticateMessage(data, data.MessageData)
if !valid {
log.Println("Failed to authenticate message")
return
}
// locate request data and remove it if found
_, ok := p.requests[data.MessageData.Id]
if ok {
// remove request from map as we have processed it here
delete(p.requests, data.MessageData.Id)
} else {
log.Println("Failed to locate request data boject for response")
return
}
log.Printf("%s: Received ping response from %s. Message id:%s. Message: %s.", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id, data.Message)
p.done <- true
}
func (p *PingProtocol) Ping(host host.Host) bool {
log.Printf("%s: Sending ping to: %s....", p.node.ID(), host.ID())
// create message data
req := &p2p.PingRequest{MessageData: p.node.NewMessageData(uuid.New().String(), false),
Message: fmt.Sprintf("Ping from %s", p.node.ID())}
// sign the data
signature, err := p.node.signProtoMessage(req)
if err != nil {
log.Println("failed to sign pb data")
return false
}
// add the signature to the message
req.MessageData.Sign = string(signature)
s, err := p.node.NewStream(context.Background(), host.ID(), pingRequest)
if err != nil {
log.Println(err)
return false
}
ok := p.node.sendProtoMessage(req, s)
if !ok {
return false
}
// store ref request so response handler has access to it
p.requests[req.MessageData.Id] = req
log.Printf("%s: Ping to: %s was sent. Message Id: %s, Message: %s", p.node.ID(), host.ID(), req.MessageData.Id, req.Message)
return true
}
# Protocol Multiplexing using multicodecs with libp2p
This examples shows how to use multicodecs (i.e. json) to encode and transmit information between LibP2P hosts using LibP2P Streams.
Multicodecs present a common interface, making it very easy to swap the codec implementation if needed.
This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo).
## Build
From `go-libp2p` base folder:
```
> make deps-protocol-muxing
> go build -o multicodecs ./examples/protocol-multiplexing-with-multicodecs
```
## Usage
```
> ./multicodecs
```
## Details
The example creates two LibP2P Hosts. Host1 opens a stream to Host2. Host2 has an `StreamHandler` to deal with the incoming stream. This is covered in the `echo` example.
Both hosts simulate a conversation. But rather than sending raw messages on the stream, each message in the conversation is encoded under a `json` object (using the `json` multicodec). For example:
```
{
"Msg": "This is the message",
"Index": 3,
"HangUp": false
}
```
The stream lasts until one of the sides closes it when the HangUp field is `true`.
package main
import (
"bufio"
"context"
"fmt"
"log"
"math/rand"
"time"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
ps "github.com/libp2p/go-libp2p-peerstore"
libp2p "github.com/libp2p/go-libp2p"
multicodec "github.com/multiformats/go-multicodec"
json "github.com/multiformats/go-multicodec/json"
)
const proto = "/example/1.0.0"
// Message is a serializable/encodable object that we will send
// on a Stream.
type Message struct {
Msg string
Index int
HangUp bool
}
// streamWrap wraps a libp2p stream. We encode/decode whenever we
// write/read from a stream, so we can just carry the encoders
// and bufios with us
type WrappedStream struct {
stream inet.Stream
enc multicodec.Encoder
dec multicodec.Decoder
w *bufio.Writer
r *bufio.Reader
}
// wrapStream takes a stream and complements it with r/w bufios and
// decoder/encoder. In order to write raw data to the stream we can use
// wrap.w.Write(). To encode something into it we can wrap.enc.Encode().
// Finally, we should wrap.w.Flush() to actually send the data. Handling
// incoming data works similarly with wrap.r.Read() for raw-reading and
// wrap.dec.Decode() to decode.
func WrapStream(s inet.Stream) *WrappedStream {
reader := bufio.NewReader(s)
writer := bufio.NewWriter(s)
// This is where we pick our specific multicodec. In order to change the
// codec, we only need to change this place.
// See https://godoc.org/github.com/multiformats/go-multicodec/json
dec := json.Multicodec(false).Decoder(reader)
enc := json.Multicodec(false).Encoder(writer)
return &WrappedStream{
stream: s,
r: reader,
w: writer,
enc: enc,
dec: dec,
}
}
// messages that will be sent between the hosts.
var conversationMsgs = []string{
"Hello!",
"Hey!",
"How are you doing?",
"Very good! It is great that you can send data on a stream to me!",
"Not only that, the data is encoded in a JSON object.",
"Yeah, and we are using the multicodecs interface to encode and decode.",
"This way we could swap it easily for, say, cbor, or msgpack!",
"Let's leave that as an excercise for the reader...",
"Agreed, our last message should activate the HangUp flag",
"Yes, and the example code will close streams. So sad :(. Bye!",
}
func makeRandomHost(port int) host.Host {
h, err := libp2p.New(context.Background(), libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)))
if err != nil {
panic(err)
}
return h
}
func main() {
// Choose random ports between 10000-10100
rand.Seed(666)
port1 := rand.Intn(100) + 10000
port2 := port1 + 1
// Make 2 hosts
h1 := makeRandomHost(port1)
h2 := makeRandomHost(port2)
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL)
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL)
log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID())
// Define a stream handler for host number 2
h2.SetStreamHandler(proto, func(stream inet.Stream) {
log.Printf("%s: Received a stream", h2.ID())
wrappedStream := WrapStream(stream)
defer stream.Close()
handleStream(wrappedStream)
})
// Create new stream from h1 to h2 and start the conversation
stream, err := h1.NewStream(context.Background(), h2.ID(), proto)
if err != nil {
log.Fatal(err)
}
wrappedStream := WrapStream(stream)
// This sends the first message
sendMessage(0, wrappedStream)
// We keep the conversation on the created stream so we launch
// this to handle any responses
handleStream(wrappedStream)
// When we are done, close the stream on our side and exit.
stream.Close()
}
// receiveMessage reads and decodes a message from the stream
func receiveMessage(ws *WrappedStream) (*Message, error) {
var msg Message
err := ws.dec.Decode(&msg)
if err != nil {
return nil, err
}
return &msg, nil
}
// sendMessage encodes and writes a message to the stream
func sendMessage(index int, ws *WrappedStream) error {
msg := &Message{
Msg: conversationMsgs[index],
Index: index,
HangUp: index >= len(conversationMsgs)-1,
}
err := ws.enc.Encode(msg)
// Because output is buffered with bufio, we need to flush!
ws.w.Flush()
return err
}
// handleStream is a for loop which receives and then sends a message
// an artificial delay of 500ms happens in-between.
// When Message.HangUp is true, it exists. This will close the stream
// on one of the sides. The other side's receiveMessage() will error
// with EOF, thus also breaking out from the loop.
func handleStream(ws *WrappedStream) {
for {
// Read
msg, err := receiveMessage(ws)
if err != nil {
break
}
pid := ws.stream.Conn().LocalPeer()
log.Printf("%s says: %s\n", pid, msg.Msg)
time.Sleep(500 * time.Millisecond)
if msg.HangUp {
break
}
// Send response
err = sendMessage(msg.Index+1, ws)
if err != nil {
break
}
}
}
...@@ -160,12 +160,6 @@ ...@@ -160,12 +160,6 @@
"name": "go-smux-yamux", "name": "go-smux-yamux",
"version": "2.0.3" "version": "2.0.3"
}, },
{
"author": "whyrusleeping",
"hash": "QmQvJiADDe7JR4m968MwXobTCCzUqQkP87aRHe29MEBGHV",
"name": "go-logging",
"version": "0.0.0"
},
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmWzjXAyBTygw6CeCTUnhJzhFucfxY5FJivSoiGuiSbPjS", "hash": "QmWzjXAyBTygw6CeCTUnhJzhFucfxY5FJivSoiGuiSbPjS",
...@@ -202,12 +196,6 @@ ...@@ -202,12 +196,6 @@
"name": "go-smux-multiplex", "name": "go-smux-multiplex",
"version": "3.0.11" "version": "3.0.11"
}, },
{
"author": "multiformats",
"hash": "QmRDePEiL4Yupq5EkcK3L3ko3iMgYaqUdLu7xc1kqs7dnV",
"name": "go-multicodec",
"version": "0.1.5"
},
{ {
"author": "whyrusleeping", "author": "whyrusleeping",
"hash": "QmX4UrmHGPnFxwfsunZjPbykzyv8Frg9AVmNariXqrLsMs", "hash": "QmX4UrmHGPnFxwfsunZjPbykzyv8Frg9AVmNariXqrLsMs",
...@@ -243,12 +231,6 @@ ...@@ -243,12 +231,6 @@
"hash": "Qma7AuxEA7dd1wAy95hTxXgxy4q7mU4Pyd1x4PRAzGP1fs", "hash": "Qma7AuxEA7dd1wAy95hTxXgxy4q7mU4Pyd1x4PRAzGP1fs",
"name": "go-libp2p-transport-upgrader", "name": "go-libp2p-transport-upgrader",
"version": "0.1.5" "version": "0.1.5"
},
{
"author": "google",
"hash": "QmSSeQqc5QeuefkaM6JFV5tSF9knLUkXKVhW1eYRiqe72W",
"name": "uuid",
"version": "0.1.0"
} }
], ],
"gxVersion": "0.4.0", "gxVersion": "0.4.0",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment