Commit c5aa669d authored by Jeromy Johnson's avatar Jeromy Johnson Committed by GitHub
Browse files

Merge pull request #185 from libp2p/libp2p-examples

Libp2p examples
parents 74390ca5 93802a40
*.swp *.swp
examples/echo/echo examples/echo/echo
examples/multicodecs/multicodecs
...@@ -11,7 +11,8 @@ install: true ...@@ -11,7 +11,8 @@ install: true
before_install: before_install:
- make deps - make deps
- go get -u github.com/multiformats/go-multicodec
- go get -u github.com/jbenet/go-msgio
#- go vet ./... #- go vet ./...
script: script:
- go test ./... -v - go test ./... -v
......
...@@ -2,6 +2,10 @@ gx: ...@@ -2,6 +2,10 @@ gx:
go get github.com/whyrusleeping/gx go get github.com/whyrusleeping/gx
go get github.com/whyrusleeping/gx-go go get github.com/whyrusleeping/gx-go
deps-examples: deps
go get -u github.com/multiformats/go-multicodec
go get -u github.com/jbenet/go-msgio
deps: gx deps: gx
gx --verbose install --global gx --verbose install --global
gx-go rewrite gx-go rewrite
......
...@@ -25,18 +25,19 @@ From `go-libp2p` base folder: ...@@ -25,18 +25,19 @@ From `go-libp2p` base folder:
## Usage ## Usage
``` ```
> ./echo -l 1235 > ./echo -l 10000
2016/11/10 10:45:37 I am /ip4/127.0.0.1/tcp/1235/ipfs/QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc 2017/03/15 14:11:32 I am /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e
2016/11/10 10:45:37 listening for connections 2017/03/15 14:11:32 Now run "./echo -l 10001 -d /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e -secio" on a different terminal
2017/03/15 14:11:32 listening for connections
``` ```
The listener libp2p host will print its `Multiaddress`, which indicates how it The listener libp2p host will print its `Multiaddress`, which indicates how it
can be reached (ip4+tcp) and its randomly generated ID (`QmNtX1cv...`) can be reached (ip4+tcp) and its randomly generated ID (`QmYo41Gyb...`)
Now, launch another node that talks to the listener: Now, launch another node that talks to the listener:
``` ```
> ./echo -d /ip4/127.0.0.1/tcp/1235/ipfs/QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc -l 1236 > ./echo -l 10001 -d /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e
``` ```
The new node with send the message `"Hello, world!"` to the The new node with send the message `"Hello, world!"` to the
...@@ -61,9 +62,9 @@ In order to create the swarm (and a `basichost`), the example needs: ...@@ -61,9 +62,9 @@ In order to create the swarm (and a `basichost`), the example needs:
* An * An
[ipfs-procotol ID](https://godoc.org/github.com/libp2p/go-libp2p-peer#ID) [ipfs-procotol ID](https://godoc.org/github.com/libp2p/go-libp2p-peer#ID)
like `QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc`. The example like `QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc`. The example
autogenerates this on every run. An optional key-pair to secure autogenerates a key pair on every run and uses an ID extracted from the
communications can be added to it. The example autogenerates them when public key (the hash of the public key). When using `-secio`, it uses
using `-secio`. the key pair to encrypt communications.
* A [Multiaddress](https://godoc.org/github.com/multiformats/go-multiaddr), * A [Multiaddress](https://godoc.org/github.com/multiformats/go-multiaddr),
which indicates how to reach this peer. There can be several of them which indicates how to reach this peer. There can be several of them
(using different protocols or locations for example). Example: (using different protocols or locations for example). Example:
......
package main package main
import ( import (
"bufio"
"context" "context"
"flag" "flag"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"math/rand"
"strings"
"time"
golog "github.com/ipfs/go-log" golog "github.com/ipfs/go-log"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net" net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore" pstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm" swarm "github.com/libp2p/go-libp2p-swarm"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
testutil "github.com/libp2p/go-testutil"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
gologging "github.com/whyrusleeping/go-logging" gologging "github.com/whyrusleeping/go-logging"
peerstore "github.com/libp2p/go-libp2p-peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
) )
// create a 'Host' with a random peer to listen on the given address // makeBasicHost creates a LibP2P host with a random peer ID listening on the
func makeBasicHost(listen string, secio bool) (host.Host, error) { // given multiaddress. It will use secio if secio is true.
addr, err := ma.NewMultiaddr(listen) func makeBasicHost(listenPort int, secio bool) (host.Host, error) {
// Generate a key pair for this host. We will use it at least
// to obtain a valid host ID.
priv, pub, err := crypto.GenerateKeyPair(crypto.RSA, 2048)
if err != nil {
return nil, err
}
// Obtain Peer ID from public key
pid, err := peer.IDFromPublicKey(pub)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Create a multiaddress
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort))
if err != nil {
return nil, err
}
// Create a peerstore
ps := pstore.NewPeerstore() ps := pstore.NewPeerstore()
var pid peer.ID
// If using secio, we add the keys to the peerstore
// for this peer ID.
if secio { if secio {
ident, err := testutil.RandIdentity() ps.AddPrivKey(pid, priv)
if err != nil { ps.AddPubKey(pid, pub)
return nil, err
}
ident.PrivateKey()
ps.AddPrivKey(ident.ID(), ident.PrivateKey())
ps.AddPubKey(ident.ID(), ident.PublicKey())
pid = ident.ID()
} else {
fakepid, err := testutil.RandPeerID()
if err != nil {
return nil, err
}
pid = fakepid
} }
ctx := context.Background() // Create swarm (implements libP2P Network)
netwrk, err := swarm.NewNetwork(
context.Background(),
[]ma.Multiaddr{addr},
pid,
ps,
nil)
// create a new swarm to be used by the service host basicHost := bhost.New(netwrk)
netw, err := swarm.NewNetwork(ctx, []ma.Multiaddr{addr}, pid, ps, nil)
if err != nil { // Build host multiaddress
return nil, err hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))
// Now we can build a full multiaddress to reach this host
// by encapsulating both addresses:
fullAddr := addr.Encapsulate(hostAddr)
log.Printf("I am %s\n", fullAddr)
if secio {
log.Printf("Now run \"./echo -l %d -d %s -secio\" on a different terminal\n", listenPort+1, fullAddr)
} else {
log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr)
} }
log.Printf("I am %s/ipfs/%s\n", addr, pid.Pretty()) return basicHost, nil
return bhost.New(netw), nil
} }
func main() { func main() {
rand.Seed(time.Now().UnixNano()) // LibP2P code uses golog to log messages. They log with different
// string IDs (i.e. "swarm"). We can control the verbosity level for
// all loggers with:
golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info
// Parse options from the command line
listenF := flag.Int("l", 0, "wait for incoming connections") listenF := flag.Int("l", 0, "wait for incoming connections")
target := flag.String("d", "", "target peer to dial") target := flag.String("d", "", "target peer to dial")
secio := flag.Bool("secio", false, "enable secio") secio := flag.Bool("secio", false, "enable secio")
flag.Parse() flag.Parse()
if *listenF == 0 { if *listenF == 0 {
log.Fatal("Please provide a port to bind on with -l") log.Fatal("Please provide a port to bind on with -l")
} }
listenaddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", *listenF) // Make a host that listens on the given multiaddress
ha, err := makeBasicHost(*listenF, *secio)
ha, err := makeBasicHost(listenaddr, *secio)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// Set a stream handler on host A // Set a stream handler on host A. /echo/1.0.0 is
// a user-defined protocol name.
ha.SetStreamHandler("/echo/1.0.0", func(s net.Stream) { ha.SetStreamHandler("/echo/1.0.0", func(s net.Stream) {
log.Println("Got a new stream!") log.Println("Got a new stream!")
defer s.Close() defer s.Close()
...@@ -93,8 +115,10 @@ func main() { ...@@ -93,8 +115,10 @@ func main() {
log.Println("listening for connections") log.Println("listening for connections")
select {} // hang forever select {} // hang forever
} }
// This is where the listener code ends /**** This is where the listener code ends ****/
// The following code extracts target's the peer ID from the
// given multiaddress
ipfsaddr, err := ma.NewMultiaddr(*target) ipfsaddr, err := ma.NewMultiaddr(*target)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
...@@ -110,26 +134,26 @@ func main() { ...@@ -110,26 +134,26 @@ func main() {
log.Fatalln(err) log.Fatalln(err)
} }
tptaddr := strings.Split(ipfsaddr.String(), "/ipfs/")[0] // Decapsulate the /ipfs/<peerID> part from the target
// This creates a MA with the "/ip4/ipaddr/tcp/port" part of the target // /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
tptmaddr, err := ma.NewMultiaddr(tptaddr) targetPeerAddr, _ := ma.NewMultiaddr(
if err != nil { fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
log.Fatalln(err) targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
}
// We need to add the target to our peerstore, so we know how we can // We have a peer ID and a targetAddr so we add it to the peerstore
// contact it // so LibP2P knows how to contact it
ha.Peerstore().AddAddr(peerid, tptmaddr, pstore.PermanentAddrTTL) ha.Peerstore().AddAddr(peerid, targetAddr, peerstore.PermanentAddrTTL)
log.Println("opening stream") log.Println("opening stream")
// make a new stream from host B to host A // make a new stream from host B to host A
// it should be handled on host A by the handler we set above // it should be handled on host A by the handler we set above because
// we use the same /echo/1.0.0 protocol
s, err := ha.NewStream(context.Background(), peerid, "/echo/1.0.0") s, err := ha.NewStream(context.Background(), peerid, "/echo/1.0.0")
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
_, err = s.Write([]byte("Hello, world!")) _, err = s.Write([]byte("Hello, world!\n"))
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
...@@ -142,18 +166,17 @@ func main() { ...@@ -142,18 +166,17 @@ func main() {
log.Printf("read reply: %q\n", out) log.Printf("read reply: %q\n", out)
} }
// doEcho reads some data from a stream, writes it back and closes the // doEcho reads a line of data a stream and writes it back
// stream.
func doEcho(s net.Stream) { func doEcho(s net.Stream) {
buf := make([]byte, 1024) buf := bufio.NewReader(s)
n, err := s.Read(buf) str, err := buf.ReadString('\n')
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
log.Printf("read request: %q\n", buf[:n]) log.Printf("read: %s\n", str)
_, err = s.Write(buf[:n]) _, err = s.Write([]byte(str))
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
......
# HTTP proxy service with libp2p
This examples shows how to create a simple HTTP proxy service with libp2p:
```
XXX
XX XXXXXX
X XX
XXXXXXX XX XX XXXXXXXXXX
+---------------------+ +---------------------+ XXX XXX XXX XXX
HTTP Request | | | | XX XX
+-------------------> | libp2p stream | | HTTP X X
| Local peer <------------------> Remote peer <-------------> HTTP SERVER - THE INTERNET XX
<-------------------+ | | | Req & Resp XX X
HTTP Response | libp2p host | | libp2p host | XXXX XXXX XXXXXXXXXXXXXXXXXXXX XXXX
+---------------------+ +---------------------+ XXXXX
```
In order to proxy an HTTP request, we create a local peer which listens on `localhost:9900`. HTTP requests performed to that address are tunneled via a libp2p stream to a remote peer, which then performs the HTTP requests and sends the response back to the local peer, which relays it
to the user.
Note that this is a very simple approach to a proxy, and does not perform any header management, nor supports HTTPS. The `proxy.go` code is thoroughly commeted, detailing what is happening in every step.
## Build
From `go-libp2p` base folder:
```
> make deps
> go build ./examples/echo
```
## Usage
First run the "remote" peer as follows. It will print a local peer address. If you would like to run this on a separate machine, please replace the IP accordingly:
```sh
$ ./http-proxy
Proxy server is ready
libp2p-peer addresses:
/ip4/127.0.0.1/tcp/12000/ipfs/QmddTrQXhA9AkCpXPTkcY7e22NK73TwkUms3a44DhTKJTD
```
The run the local peer, indicating that it will need to forward http requests to the remote peer as follows:
```
$ ./http-proxy -d /ip4/127.0.0.1/tcp/12000/ipfs/QmddTrQXhA9AkCpXPTkcY7e22NK73TwkUms3a44DhTKJTD
Proxy server is ready
libp2p-peer addresses:
/ip4/127.0.0.1/tcp/12001/ipfs/Qmaa2AYTha1UqcFVX97p9R1UP7vbzDLY7bqWsZw1135QvN
proxy listening on 127.0.0.1:9900
```
As you can see, the proxy prints the listening address `127.0.0.1:9900`. You can now use this address as proxy, for example with `curl`:
```
$ curl -x "127.0.0.1:9900" "http://ipfs.io/ipfs/QmfUX75pGRBRDnjeoMkQzuQczuCup2aYbeLxz5NzeSu9G6"
it works!
```
package main
import (
"bufio"
"context"
"flag"
"fmt"
"io"
"log"
"net/http"
"strings"
// We need to import libp2p's libraries that we use in this project.
// In order to work, these libraries need to be rewritten by gx-go.
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
ps "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
)
// Protocol defines the libp2p protocol that we will use for the libp2p proxy
// service that we are going to provide. This will tag the streams used for
// this service. Streams are multiplexed and their protocol tag helps
// libp2p handle them to the right handler functions.
const Protocol = "/proxy-example/0.0.1"
// makeRandomHost creates a libp2p host with a randomly generated identity.
// This step is described in depth in other tutorials.
func makeRandomHost(port int) host.Host {
priv, pub, err := crypto.GenerateKeyPair(crypto.RSA, 2048)
if err != nil {
log.Fatalln(err)
}
pid, err := peer.IDFromPublicKey(pub)
if err != nil {
log.Fatalln(err)
}
listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
if err != nil {
log.Fatalln(err)
}
ps := ps.NewPeerstore()
ps.AddPrivKey(pid, priv)
ps.AddPubKey(pid, pub)
n, err := swarm.NewNetwork(context.Background(),
[]ma.Multiaddr{listen}, pid, ps, nil)
if err != nil {
log.Fatalln(err)
}
return bhost.New(n)
}
// ProxyService provides HTTP proxying on top of libp2p by launching an
// HTTP server which tunnels the requests to a destination peer running
// ProxyService too.
type ProxyService struct {
host host.Host
dest peer.ID
proxyAddr ma.Multiaddr
}
// NewProxyService attaches a proxy service to the given libp2p Host.
// The proxyAddr parameter specifies the address on which the
// HTTP proxy server listens. The dest parameter specifies the peer
// ID of the remote peer in charge of performing the HTTP requests.
//
// ProxyAddr/dest may be nil/"" it is not necessary that this host
// provides a listening HTTP server (and instead its only function is to
// perform the proxied http requests it receives from a different peer.
//
// The addresses for the dest peer should be part of the host's peerstore.
func NewProxyService(h host.Host, proxyAddr ma.Multiaddr, dest peer.ID) *ProxyService {
// We let our host know that it needs to handle streams tagged with the
// protocol id that we have defined, and then handle them to
// our own streamHandling function.
h.SetStreamHandler(Protocol, streamHandler)
fmt.Println("Proxy server is ready")
fmt.Println("libp2p-peer addresses:")
for _, a := range h.Addrs() {
fmt.Printf("%s/ipfs/%s\n", a, peer.IDB58Encode(h.ID()))
}
return &ProxyService{
host: h,
dest: dest,
proxyAddr: proxyAddr,
}
}
// streamHandler is our function to handle any libp2p-net streams that belong
// to our protocol. The streams should contain an HTTP request which we need
// to parse, make on behalf of the original node, and then write the response
// on the stream, before closing it.
func streamHandler(stream inet.Stream) {
// Remember to close the stream when we are done.
defer stream.Close()
// Create a new buffered reader, as ReadRequest needs one.
// The buffered reader reads from our stream, on which we
// have sent the HTTP request (see ServeHTTP())
buf := bufio.NewReader(stream)
// Read the HTTP request from the buffer
req, err := http.ReadRequest(buf)
if err != nil {
log.Println(err)
return
}
defer req.Body.Close()
// We need to reset these fields in the request
// URL as they are not maintained.
req.URL.Scheme = "http"
hp := strings.Split(req.Host, ":")
if len(hp) > 1 && hp[1] == "443" {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
req.URL.Host = req.Host
outreq := new(http.Request)
*outreq = *req
// We now make the request
fmt.Printf("Making request to %s\n", req.URL)
resp, err := http.DefaultTransport.RoundTrip(outreq)
if err != nil {
log.Println(err)
return
}
// resp.Write writes whatever response we obtained for our
// request back to the stream.
resp.Write(stream)
}
// Serve listens on the ProxyService's proxy address. This effectively
// allows to set the listening address as http proxy.
func (p *ProxyService) Serve() {
_, serveArgs, _ := manet.DialArgs(p.proxyAddr)
fmt.Println("proxy listening on ", serveArgs)
if p.dest != "" {
http.ListenAndServe(serveArgs, p)
}
}
// ServeHTTP implements the http.Handler interface. WARNING: This is the
// simplest approach to a proxy. Therefore we do not do any of the things
// that should be done when implementing a reverse proxy (like handling
// headers correctly). For how to do it properly, see:
// https://golang.org/src/net/http/httputil/reverseproxy.go?s=3845:3920#L121
//
// ServeHTTP opens a stream to the dest peer for every HTTP request.
// Streams are multiplexed over single connections so, unlike connections
// themselves, they are cheap to create and dispose of.
func (p *ProxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Printf("proxying request for %s to peer %s\n", r.URL, p.dest.Pretty())
// We need to send the request to the remote libp2p peer, so
// we open a stream to it
stream, err := p.host.NewStream(context.Background(), p.dest, Protocol)
// If an error happens, we write an error for response.
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer stream.Close()
// r.Write() writes the HTTP request to the stream.
err = r.Write(stream)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
// Now we read the response that was sent from the dest
// peer
buf := bufio.NewReader(stream)
resp, err := http.ReadResponse(buf, r)
if err != nil {
log.Println(err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
// Copy any headers
for k, v := range resp.Header {
for _, s := range v {
w.Header().Add(k, s)
}
}
// Write response status and headers
w.WriteHeader(resp.StatusCode)
// Finally copy the body
io.Copy(w, resp.Body)
resp.Body.Close()
}
// addAddrToPeerstore parses a peer multiaddress and adds
// it to the given host's peerstore, so it knows how to
// contact it. It returns the peer ID of the remote peer.
func addAddrToPeerstore(h host.Host, addr string) peer.ID {
// The following code extracts target's the peer ID from the
// given multiaddress
ipfsaddr, err := ma.NewMultiaddr(addr)
if err != nil {
log.Fatalln(err)
}
pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
if err != nil {
log.Fatalln(err)
}
peerid, err := peer.IDB58Decode(pid)
if err != nil {
log.Fatalln(err)
}
// Decapsulate the /ipfs/<peerID> part from the target
// /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)
// We have a peer ID and a targetAddr so we add
// it to the peerstore so LibP2P knows how to contact it
h.Peerstore().AddAddr(peerid, targetAddr, ps.PermanentAddrTTL)
return peerid
}
func main() {
flag.Usage = func() {
fmt.Println(`
This example creates a simple HTTP Proxy using two libp2p peers. The first peer
provides an HTTP server locally which tunnels the HTTP requests with libp2p
to a remote peer. The remote peer performs the requests and
send the sends the response back.
Usage: Start remote peer first with: ./proxy
Then start the local peer with: ./proxy -d <remote-peer-multiaddress>
Then you can do something like: curl -x "localhost:9900" "http://ipfs.io".
This proxies sends the request through the local peer, which proxies it to
the remote peer, which makes it and sends the response back.
`)
flag.PrintDefaults()
}
// Parse some flags
destPeer := flag.String("d", "", "destination peer address")
port := flag.Int("p", 9900, "proxy port")
p2pport := flag.Int("l", 12000, "libp2p listen port")
flag.Parse()
// If we have a destination peer we will start a local server
if *destPeer != "" {
// We use p2pport+1 in order to not collide if the user
// is running the remote peer locally on that port
host := makeRandomHost(*p2pport + 1)
// Make sure our host knows how to reach destPeer
destPeerID := addAddrToPeerstore(host, *destPeer)
proxyAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", *port))
if err != nil {
log.Fatalln(err)
}
// Create the proxy service and start the http server
proxy := NewProxyService(host, proxyAddr, destPeerID)
proxy.Serve() // serve hangs forever
} else {
host := makeRandomHost(*p2pport)
// In this case we only need to make sure our host
// knows how to handle incoming proxied requests from
// another peer.
_ = NewProxyService(host, nil, "")
<-make(chan struct{}) // hang forever
}
}
# Using multicodecs with LibP2P
This examples shows how to use multicodecs (i.e. json) to encode and transmit information between LibP2P hosts using LibP2P Streams.
Multicodecs present a common interface, making it very easy to swap the codec implementation if needed.
This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo).
## Build
From `go-libp2p` base folder:
```
> make deps-examples
> go build ./examples/multicodecs
```
## Usage
```
> ./multicodecs
```
## Details
The example creates two LibP2P Hosts. Host1 opens a stream to Host2. Host2 has an `StreamHandler` to deal with the incoming stream. This is covered in the `echo` example.
Both hosts simulate a conversation. But rather than sending raw messages on the stream, each message in the conversation is encoded under a `json` object (using the `json` multicodec). For example:
```
{
"Msg": "This is the message",
"Index": 3,
"HangUp": false
}
```
The stream lasts until one of the sides closes it when the HangUp field is `true`.
package main
import (
"bufio"
"context"
"fmt"
"log"
"math/rand"
"time"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
ps "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
ma "github.com/multiformats/go-multiaddr"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
multicodec "github.com/multiformats/go-multicodec"
json "github.com/multiformats/go-multicodec/json"
)
const proto = "/example/1.0.0"
// Message is a serializable/encodable object that we will send
// on a Stream.
type Message struct {
Msg string
Index int
HangUp bool
}
// streamWrap wraps a libp2p stream. We encode/decode whenever we
// write/read from a stream, so we can just carry the encoders
// and bufios with us
type WrappedStream struct {
stream inet.Stream
enc multicodec.Encoder
dec multicodec.Decoder
w *bufio.Writer
r *bufio.Reader
}
// wrapStream takes a stream and complements it with r/w bufios and
// decoder/encoder. In order to write raw data to the stream we can use
// wrap.w.Write(). To encode something into it we can wrap.enc.Encode().
// Finally, we should wrap.w.Flush() to actually send the data. Handling
// incoming data works similarly with wrap.r.Read() for raw-reading and
// wrap.dec.Decode() to decode.
func WrapStream(s inet.Stream) *WrappedStream {
reader := bufio.NewReader(s)
writer := bufio.NewWriter(s)
// This is where we pick our specific multicodec. In order to change the
// codec, we only need to change this place.
// See https://godoc.org/github.com/multiformats/go-multicodec/json
dec := json.Multicodec(false).Decoder(reader)
enc := json.Multicodec(false).Encoder(writer)
return &WrappedStream{
stream: s,
r: reader,
w: writer,
enc: enc,
dec: dec,
}
}
// messages that will be sent between the hosts.
var conversationMsgs = []string{
"Hello!",
"Hey!",
"How are you doing?",
"Very good! It is great that you can send data on a stream to me!",
"Not only that, the data is encoded in a JSON object.",
"Yeah, and we are using the multicodecs interface to encode and decode.",
"This way we could swap it easily for, say, cbor, or msgpack!",
"Let's leave that as an excercise for the reader...",
"Agreed, our last message should activate the HangUp flag",
"Yes, and the example code will close streams. So sad :(. Bye!",
}
func makeRandomHost(port int) host.Host {
// Ignoring most errors for brevity
// See echo example for more details and better implementation
priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048)
pid, _ := peer.IDFromPublicKey(pub)
listen, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port))
ps := ps.NewPeerstore()
ps.AddPrivKey(pid, priv)
ps.AddPubKey(pid, pub)
n, _ := swarm.NewNetwork(context.Background(),
[]ma.Multiaddr{listen}, pid, ps, nil)
return bhost.New(n)
}
func main() {
// Choose random ports between 10000-10100
rand.Seed(666)
port1 := rand.Intn(100) + 10000
port2 := port1 + 1
// Make 2 hosts
h1 := makeRandomHost(port1)
h2 := makeRandomHost(port2)
h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL)
h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL)
log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID())
// Define a stream handler for host number 2
h2.SetStreamHandler(proto, func(stream inet.Stream) {
log.Printf("%s: Received a stream", h2.ID())
wrappedStream := WrapStream(stream)
defer stream.Close()
handleStream(wrappedStream)
})
// Create new stream from h1 to h2 and start the conversation
stream, err := h1.NewStream(context.Background(), h2.ID(), proto)
if err != nil {
log.Fatal(err)
}
wrappedStream := WrapStream(stream)
// This sends the first message
sendMessage(0, wrappedStream)
// We keep the conversation on the created stream so we launch
// this to handle any responses
handleStream(wrappedStream)
// When we are done, close the stream on our side and exit.
stream.Close()
}
// receiveMessage reads and decodes a message from the stream
func receiveMessage(ws *WrappedStream) (*Message, error) {
var msg Message
err := ws.dec.Decode(&msg)
if err != nil {
return nil, err
}
return &msg, nil
}
// sendMessage encodes and writes a message to the stream
func sendMessage(index int, ws *WrappedStream) error {
msg := &Message{
Msg: conversationMsgs[index],
Index: index,
HangUp: index >= len(conversationMsgs)-1,
}
err := ws.enc.Encode(msg)
// Because output is buffered with bufio, we need to flush!
ws.w.Flush()
return err
}
// handleStream is a for loop which receives and then sends a message
// an artificial delay of 500ms happens in-between.
// When Message.HangUp is true, it exists. This will close the stream
// on one of the sides. The other side's receiveMessage() will error
// with EOF, thus also breaking out from the loop.
func handleStream(ws *WrappedStream) {
for {
// Read
msg, err := receiveMessage(ws)
if err != nil {
break
}
pid := ws.stream.Conn().LocalPeer()
log.Printf("%s says: %s\n", pid, msg.Msg)
time.Sleep(500 * time.Millisecond)
if msg.HangUp {
break
}
// Send response
err = sendMessage(msg.Index+1, ws)
if err != nil {
break
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment