Commit 0874c503 authored by Aviv Eyal's avatar Aviv Eyal Committed by Steven Allen
Browse files

Refactoring

parent f1dd3185
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"log" "log"
host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net" inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net"
uuid "github.com/google/uuid" uuid "github.com/google/uuid"
...@@ -20,20 +19,20 @@ const echoRequest = "/echo/echoreq/0.0.1" ...@@ -20,20 +19,20 @@ const echoRequest = "/echo/echoreq/0.0.1"
const echoResponse = "/echo/echoresp/0.0.1" const echoResponse = "/echo/echoresp/0.0.1"
type EchoProtocol struct { type EchoProtocol struct {
host host.Host // local host node *Node // local host
requests map[string]*p2p.EchoRequest // used to access request data from response handlers requests map[string]*p2p.EchoRequest // used to access request data from response handlers
done chan bool // only for demo purposes to hold main from terminating done chan bool // only for demo purposes to hold main from terminating
} }
func NewEchoProtocol(host host.Host, done chan bool) *EchoProtocol { func NewEchoProtocol(node *Node, done chan bool) *EchoProtocol {
e := EchoProtocol{host: host, requests: make(map[string]*p2p.EchoRequest), done: done} e := EchoProtocol{node: node, requests: make(map[string]*p2p.EchoRequest), done: done}
host.SetStreamHandler(echoRequest, e.onEchoRequest) node.SetStreamHandler(echoRequest, e.onEchoRequest)
host.SetStreamHandler(echoResponse, e.onEchoResponse) node.SetStreamHandler(echoResponse, e.onEchoResponse)
return &e return &e
} }
// remote peer requests handler // remote peer requests handler
func (e *EchoProtocol) onEchoRequest(s inet.Stream) { func (e EchoProtocol) onEchoRequest(s inet.Stream) {
// get request data // get request data
data := &p2p.EchoRequest{} data := &p2p.EchoRequest{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
...@@ -49,10 +48,10 @@ func (e *EchoProtocol) onEchoRequest(s inet.Stream) { ...@@ -49,10 +48,10 @@ func (e *EchoProtocol) onEchoRequest(s inet.Stream) {
// send response to request send using the message string he provided // send response to request send using the message string he provided
resp := &p2p.EchoResponse{ resp := &p2p.EchoResponse{
MessageData: NewMessageData(e.host.ID().String(), data.MessageData.Id, false), MessageData: NewMessageData(e.node.ID().String(), data.MessageData.Id, false),
Message: data.Message} Message: data.Message}
s, respErr := e.host.NewStream(context.Background(), s.Conn().RemotePeer(), echoResponse) s, respErr := e.node.NewStream(context.Background(), s.Conn().RemotePeer(), echoResponse)
if respErr != nil { if respErr != nil {
log.Fatal(respErr) log.Fatal(respErr)
return return
...@@ -66,7 +65,7 @@ func (e *EchoProtocol) onEchoRequest(s inet.Stream) { ...@@ -66,7 +65,7 @@ func (e *EchoProtocol) onEchoRequest(s inet.Stream) {
} }
// remote echo response handler // remote echo response handler
func (e *EchoProtocol) onEchoResponse(s inet.Stream) { func (e EchoProtocol) onEchoResponse(s inet.Stream) {
data := &p2p.EchoResponse{} data := &p2p.EchoResponse{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data) err := decoder.Decode(data)
...@@ -90,15 +89,15 @@ func (e *EchoProtocol) onEchoResponse(s inet.Stream) { ...@@ -90,15 +89,15 @@ func (e *EchoProtocol) onEchoResponse(s inet.Stream) {
e.done <- true e.done <- true
} }
func (e *EchoProtocol) Echo(node *Node) bool { func (e EchoProtocol) Echo(node *Node) bool {
log.Printf("%s: Sending echo to: %s....", e.host.ID(), node.host.ID()) log.Printf("%s: Sending echo to: %s....", e.node.ID(), node.ID())
// create message data // create message data
req := &p2p.EchoRequest{ req := &p2p.EchoRequest{
MessageData: NewMessageData(e.host.ID().String(), uuid.New().String(), false), MessageData: NewMessageData(e.node.ID().String(), uuid.New().String(), false),
Message: fmt.Sprintf("Echo from %s", e.host.ID())} Message: fmt.Sprintf("Echo from %s", e.node.ID())}
s, err := e.host.NewStream(context.Background(), node.host.ID(), echoRequest) s, err := e.node.NewStream(context.Background(), node.ID(), echoRequest)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
return false return false
...@@ -112,6 +111,6 @@ func (e *EchoProtocol) Echo(node *Node) bool { ...@@ -112,6 +111,6 @@ func (e *EchoProtocol) Echo(node *Node) bool {
// store request so response handler has access to it // store request so response handler has access to it
e.requests[req.MessageData.Id] = req e.requests[req.MessageData.Id] = req
log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.host.ID(), node.host.ID(), req.MessageData.Id, req.Message) log.Printf("%s: Echo to: %s was sent. Message Id: %s, Message: %s", e.node.ID(), node.ID(), req.MessageData.Id, req.Message)
return true return true
} }
...@@ -41,10 +41,10 @@ func main() { ...@@ -41,10 +41,10 @@ func main() {
// Make 2 hosts // Make 2 hosts
h1 := makeRandomNode(port1, done) h1 := makeRandomNode(port1, done)
h2 := makeRandomNode(port2, done) h2 := makeRandomNode(port2, done)
h1.host.Peerstore().AddAddrs(h2.host.ID(), h2.host.Addrs(), ps.PermanentAddrTTL) h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL)
h2.host.Peerstore().AddAddrs(h1.host.ID(), h1.host.Addrs(), ps.PermanentAddrTTL) h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL)
log.Printf("This is a conversation between %s and %s\n", h1.host.ID(), h2.host.ID()) log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID())
// send messages using the protocols // send messages using the protocols
h1.Ping(h2) h1.Ping(h2)
......
package main package main
import ( import (
"bufio"
"log"
"time"
host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host" host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net" peer "gx/ipfs/QmXYjuNuxVzXKJCfWasQk1RqkhVLDM9jtUKhqc2WPQmFSB/go-libp2p-peer"
protobufCodec "github.com/multiformats/go-multicodec/protobuf"
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
) )
// node version func (n Node) signData(data []byte) ([]byte, error) {
const clientVersion = "go-p2p-node/0.0.1" key := n.Peerstore().PrivKey(n.ID())
res, err := key.Sign(data)
// helper method - writes a protobuf go data object to a network stream return res, err
// data - reference of protobuf go data object to send (not the object itself)
// s - network stream to write the data to
func sendDataObject(data interface{}, s inet.Stream) bool {
writer := bufio.NewWriter(s)
enc := protobufCodec.Multicodec(nil).Encoder(writer)
err := enc.Encode(data)
if err != nil {
log.Fatal(err)
return false
}
writer.Flush()
return true
} }
// helper method - generate message data shared between all node's p2p protocols func (n Node) verifyData(data []byte, signature []byte, signerHostId peer.ID) bool {
// nodeId - message author id key := n.Peerstore().PubKey(signerHostId)
// messageId - unique for requests, copied from request for responses res, err := key.Verify(data, signature)
func NewMessageData(nodeId string, messageId string, gossip bool) *p2p.MessageData { return res == true && err == nil
return &p2p.MessageData{ClientVersion: clientVersion,
NodeId: nodeId,
Timestamp: time.Now().Unix(),
Id: messageId,
Gossip: gossip}
} }
// Node type - implements one or more p2p protocols // Node type - implements one or more p2p protocols
type Node struct { type Node struct {
host host.Host // lib-p2p host host.Host // lib-p2p host
*PingProtocol // ping protocol impl *PingProtocol // ping protocol impl
*EchoProtocol // echo protocol impl *EchoProtocol // echo protocol impl
} }
// create a new node with its implemented protocols // create a new node with its implemented protocols
func NewNode(host host.Host, done chan bool) *Node { func NewNode(host host.Host, done chan bool) *Node {
return &Node{host: host, node := &Node{Host: host}
PingProtocol: NewPingProtocol(host, done), node.PingProtocol = NewPingProtocol(node, done)
EchoProtocol: NewEchoProtocol(host, done)} node.EchoProtocol = NewEchoProtocol(node, done)
return node
} }
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"log" "log"
host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net" inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net"
uuid "github.com/google/uuid" uuid "github.com/google/uuid"
...@@ -20,20 +19,20 @@ const pingResponse = "/ping/pingresp/0.0.1" ...@@ -20,20 +19,20 @@ const pingResponse = "/ping/pingresp/0.0.1"
// PingProtocol type // PingProtocol type
type PingProtocol struct { type PingProtocol struct {
host host.Host // local host node *Node // local host
requests map[string]*p2p.PingRequest // used to access request data from response handlers requests map[string]*p2p.PingRequest // used to access request data from response handlers
done chan bool // only for demo purposes to stop main from terminating done chan bool // only for demo purposes to stop main from terminating
} }
func NewPingProtocol(host host.Host, done chan bool) *PingProtocol { func NewPingProtocol(node *Node, done chan bool) *PingProtocol {
p := &PingProtocol{host: host, requests: make(map[string]*p2p.PingRequest), done: done} p := &PingProtocol{node: node, requests: make(map[string]*p2p.PingRequest), done: done}
host.SetStreamHandler(pingRequest, p.onPingRequest) node.SetStreamHandler(pingRequest, p.onPingRequest)
host.SetStreamHandler(pingResponse, p.onPingResponse) node.SetStreamHandler(pingResponse, p.onPingResponse)
return p return p
} }
// remote peer requests handler // remote peer requests handler
func (p *PingProtocol) onPingRequest(s inet.Stream) { func (p PingProtocol) onPingRequest(s inet.Stream) {
// get request data // get request data
data := &p2p.PingRequest{} data := &p2p.PingRequest{}
...@@ -48,10 +47,10 @@ func (p *PingProtocol) onPingRequest(s inet.Stream) { ...@@ -48,10 +47,10 @@ func (p *PingProtocol) onPingRequest(s inet.Stream) {
// send response to sender // send response to sender
log.Printf("%s: Sending ping response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id) log.Printf("%s: Sending ping response to %s. Message id: %s...", s.Conn().LocalPeer(), s.Conn().RemotePeer(), data.MessageData.Id)
resp := &p2p.PingResponse{MessageData: NewMessageData(p.host.ID().String(), data.MessageData.Id, false), resp := &p2p.PingResponse{MessageData: NewMessageData(p.node.ID().String(), data.MessageData.Id, false),
Message: fmt.Sprintf("Ping response from %s", p.host.ID())} Message: fmt.Sprintf("Ping response from %s", p.node.ID())}
s, respErr := p.host.NewStream(context.Background(), s.Conn().RemotePeer(), pingResponse) s, respErr := p.node.NewStream(context.Background(), s.Conn().RemotePeer(), pingResponse)
if respErr != nil { if respErr != nil {
log.Fatal(respErr) log.Fatal(respErr)
return return
...@@ -65,7 +64,7 @@ func (p *PingProtocol) onPingRequest(s inet.Stream) { ...@@ -65,7 +64,7 @@ func (p *PingProtocol) onPingRequest(s inet.Stream) {
} }
// remote ping response handler // remote ping response handler
func (p *PingProtocol) onPingResponse(s inet.Stream) { func (p PingProtocol) onPingResponse(s inet.Stream) {
data := &p2p.PingResponse{} data := &p2p.PingResponse{}
decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s)) decoder := protobufCodec.Multicodec(nil).Decoder(bufio.NewReader(s))
err := decoder.Decode(data) err := decoder.Decode(data)
...@@ -87,14 +86,14 @@ func (p *PingProtocol) onPingResponse(s inet.Stream) { ...@@ -87,14 +86,14 @@ func (p *PingProtocol) onPingResponse(s inet.Stream) {
p.done <- true p.done <- true
} }
func (p *PingProtocol) Ping(node *Node) bool { func (p PingProtocol) Ping(node *Node) bool {
log.Printf("%s: Sending ping to: %s....", p.host.ID(), node.host.ID()) log.Printf("%s: Sending ping to: %s....", p.node.ID(), node.ID())
// create message data // create message data
req := &p2p.PingRequest{MessageData: NewMessageData(p.host.ID().String(), uuid.New().String(), false), req := &p2p.PingRequest{MessageData: NewMessageData(p.node.ID().String(), uuid.New().String(), false),
Message: fmt.Sprintf("Ping from %s", p.host.ID())} Message: fmt.Sprintf("Ping from %s", p.node.ID())}
s, err := p.host.NewStream(context.Background(), node.host.ID(), pingRequest) s, err := p.node.NewStream(context.Background(), node.Host.ID(), pingRequest)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
return false return false
...@@ -108,6 +107,6 @@ func (p *PingProtocol) Ping(node *Node) bool { ...@@ -108,6 +107,6 @@ func (p *PingProtocol) Ping(node *Node) bool {
// store ref request so response handler has access to it // store ref request so response handler has access to it
p.requests[req.MessageData.Id] = req p.requests[req.MessageData.Id] = req
log.Printf("%s: Ping to: %s was sent. Message Id: %s, Message: %s", p.host.ID(), node.host.ID(), req.MessageData.Id, req.Message) log.Printf("%s: Ping to: %s was sent. Message Id: %s, Message: %s", p.node.ID(), node.ID(), req.MessageData.Id, req.Message)
return true return true
} }
package main
import (
"bufio"
p2p "github.com/libp2p/go-libp2p/examples/multipro/pb"
protobufCodec "github.com/multiformats/go-multicodec/protobuf"
inet "gx/ipfs/QmbD5yKbXahNvoMqzeuNyKQA9vAs9fUvJg2GXeWU1fVqY5/go-libp2p-net"
"log"
"time"
//host "gx/ipfs/QmRS46AyqtpJBsf1zmQdeizSDEzo1qkWR7rdEuPFAv8237/go-libp2p-host"
//"bytes"
)
// node version
const clientVersion = "go-p2p-node/0.0.1"
// helper method - writes a protobuf go data object to a network stream
// data - reference of protobuf go data object to send (not the object itself)
// s - network stream to write the data to
func sendDataObject(data interface{}, s inet.Stream) bool {
writer := bufio.NewWriter(s)
enc := protobufCodec.Multicodec(nil).Encoder(writer)
err := enc.Encode(data)
if err != nil {
log.Fatal(err)
return false
}
writer.Flush()
return true
}
// helper method - generate message data shared between all node's p2p protocols
// nodeId - message author id
// messageId - unique for requests, copied from request for responses
func NewMessageData(nodeId string, messageId string, gossip bool) *p2p.MessageData {
return &p2p.MessageData{ClientVersion: clientVersion,
NodeId: nodeId,
Timestamp: time.Now().Unix(),
Id: messageId,
Gossip: gossip}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment