diff --git a/.gitignore b/.gitignore index 3fe82efc817a1332217cfc9a2ff3a6b8e290da14..cd7b1ceab449fd773f54ce34c3bb998c63da400c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ *.swp examples/echo/echo +examples/multicodecs/multicodecs diff --git a/.travis.yml b/.travis.yml index 22556c243950f45d86507f581f6ce65d61166f06..386f384fc86cef11db87a5bd1e70a90aade51a5f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,8 @@ install: true before_install: - make deps - + - go get -u github.com/multiformats/go-multicodec + - go get -u github.com/jbenet/go-msgio #- go vet ./... script: - go test ./... -v diff --git a/Makefile b/Makefile index 0344faddd13b198ef7284dc7354ff3db8676ef6d..910bcd34dacd3af560a6b6f0808ee03966b1093a 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,10 @@ gx: go get github.com/whyrusleeping/gx go get github.com/whyrusleeping/gx-go +deps-examples: deps + go get -u github.com/multiformats/go-multicodec + go get -u github.com/jbenet/go-msgio + deps: gx gx --verbose install --global gx-go rewrite diff --git a/examples/echo/README.md b/examples/echo/README.md index f58b9cace794d015ce9b4a31034e1214e79621b6..a83927864ae458a10d05afa9ac6b1c8330c90ec5 100644 --- a/examples/echo/README.md +++ b/examples/echo/README.md @@ -25,18 +25,19 @@ From `go-libp2p` base folder: ## Usage ``` -> ./echo -l 1235 -2016/11/10 10:45:37 I am /ip4/127.0.0.1/tcp/1235/ipfs/QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc -2016/11/10 10:45:37 listening for connections +> ./echo -l 10000 +2017/03/15 14:11:32 I am /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e +2017/03/15 14:11:32 Now run "./echo -l 10001 -d /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e -secio" on a different terminal +2017/03/15 14:11:32 listening for connections ``` The listener libp2p host will print its `Multiaddress`, which indicates how it -can be reached (ip4+tcp) and its randomly generated ID (`QmNtX1cv...`) +can be reached (ip4+tcp) and its randomly generated ID (`QmYo41Gyb...`) Now, launch another node that talks to the listener: ``` -> ./echo -d /ip4/127.0.0.1/tcp/1235/ipfs/QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc -l 1236 +> ./echo -l 10001 -d /ip4/127.0.0.1/tcp/10000/ipfs/QmYo41GybvrXk8y8Xnm1P7pfA4YEXCpfnLyzgRPnNbG35e ``` The new node with send the message `"Hello, world!"` to the @@ -61,9 +62,9 @@ In order to create the swarm (and a `basichost`), the example needs: * An [ipfs-procotol ID](https://godoc.org/github.com/libp2p/go-libp2p-peer#ID) like `QmNtX1cvrm2K6mQmMEaMxAuB4rTexhd87vpYVot4sEZzxc`. The example - autogenerates this on every run. An optional key-pair to secure - communications can be added to it. The example autogenerates them when - using `-secio`. + autogenerates a key pair on every run and uses an ID extracted from the + public key (the hash of the public key). When using `-secio`, it uses + the key pair to encrypt communications. * A [Multiaddress](https://godoc.org/github.com/multiformats/go-multiaddr), which indicates how to reach this peer. There can be several of them (using different protocols or locations for example). Example: diff --git a/examples/echo/main.go b/examples/echo/main.go index 022a1a446ae72028d0f19d0fb3cbc2e6ea7643e2..26253fcf69a2cd4b804c90d8d98eef2f1f05770f 100644 --- a/examples/echo/main.go +++ b/examples/echo/main.go @@ -1,88 +1,110 @@ package main import ( + "bufio" "context" "flag" "fmt" "io/ioutil" "log" - "math/rand" - "strings" - "time" golog "github.com/ipfs/go-log" + crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" net "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" swarm "github.com/libp2p/go-libp2p-swarm" - bhost "github.com/libp2p/go-libp2p/p2p/host/basic" - testutil "github.com/libp2p/go-testutil" ma "github.com/multiformats/go-multiaddr" gologging "github.com/whyrusleeping/go-logging" + + peerstore "github.com/libp2p/go-libp2p-peerstore" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" ) -// create a 'Host' with a random peer to listen on the given address -func makeBasicHost(listen string, secio bool) (host.Host, error) { - addr, err := ma.NewMultiaddr(listen) +// makeBasicHost creates a LibP2P host with a random peer ID listening on the +// given multiaddress. It will use secio if secio is true. +func makeBasicHost(listenPort int, secio bool) (host.Host, error) { + // Generate a key pair for this host. We will use it at least + // to obtain a valid host ID. + priv, pub, err := crypto.GenerateKeyPair(crypto.RSA, 2048) + if err != nil { + return nil, err + } + + // Obtain Peer ID from public key + pid, err := peer.IDFromPublicKey(pub) if err != nil { return nil, err } + // Create a multiaddress + addr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)) + + if err != nil { + return nil, err + } + + // Create a peerstore ps := pstore.NewPeerstore() - var pid peer.ID + // If using secio, we add the keys to the peerstore + // for this peer ID. if secio { - ident, err := testutil.RandIdentity() - if err != nil { - return nil, err - } - - ident.PrivateKey() - ps.AddPrivKey(ident.ID(), ident.PrivateKey()) - ps.AddPubKey(ident.ID(), ident.PublicKey()) - pid = ident.ID() - } else { - fakepid, err := testutil.RandPeerID() - if err != nil { - return nil, err - } - pid = fakepid + ps.AddPrivKey(pid, priv) + ps.AddPubKey(pid, pub) } - ctx := context.Background() + // Create swarm (implements libP2P Network) + netwrk, err := swarm.NewNetwork( + context.Background(), + []ma.Multiaddr{addr}, + pid, + ps, + nil) - // create a new swarm to be used by the service host - netw, err := swarm.NewNetwork(ctx, []ma.Multiaddr{addr}, pid, ps, nil) - if err != nil { - return nil, err + basicHost := bhost.New(netwrk) + + // Build host multiaddress + hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty())) + + // Now we can build a full multiaddress to reach this host + // by encapsulating both addresses: + fullAddr := addr.Encapsulate(hostAddr) + log.Printf("I am %s\n", fullAddr) + if secio { + log.Printf("Now run \"./echo -l %d -d %s -secio\" on a different terminal\n", listenPort+1, fullAddr) + } else { + log.Printf("Now run \"./echo -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr) } - log.Printf("I am %s/ipfs/%s\n", addr, pid.Pretty()) - return bhost.New(netw), nil + return basicHost, nil } func main() { - rand.Seed(time.Now().UnixNano()) + // LibP2P code uses golog to log messages. They log with different + // string IDs (i.e. "swarm"). We can control the verbosity level for + // all loggers with: golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info + + // Parse options from the command line listenF := flag.Int("l", 0, "wait for incoming connections") target := flag.String("d", "", "target peer to dial") secio := flag.Bool("secio", false, "enable secio") - flag.Parse() if *listenF == 0 { log.Fatal("Please provide a port to bind on with -l") } - listenaddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", *listenF) - - ha, err := makeBasicHost(listenaddr, *secio) + // Make a host that listens on the given multiaddress + ha, err := makeBasicHost(*listenF, *secio) if err != nil { log.Fatal(err) } - // Set a stream handler on host A + // Set a stream handler on host A. /echo/1.0.0 is + // a user-defined protocol name. ha.SetStreamHandler("/echo/1.0.0", func(s net.Stream) { log.Println("Got a new stream!") defer s.Close() @@ -93,8 +115,10 @@ func main() { log.Println("listening for connections") select {} // hang forever } - // This is where the listener code ends + /**** This is where the listener code ends ****/ + // The following code extracts target's the peer ID from the + // given multiaddress ipfsaddr, err := ma.NewMultiaddr(*target) if err != nil { log.Fatalln(err) @@ -110,26 +134,26 @@ func main() { log.Fatalln(err) } - tptaddr := strings.Split(ipfsaddr.String(), "/ipfs/")[0] - // This creates a MA with the "/ip4/ipaddr/tcp/port" part of the target - tptmaddr, err := ma.NewMultiaddr(tptaddr) - if err != nil { - log.Fatalln(err) - } + // Decapsulate the /ipfs/ part from the target + // /ip4//ipfs/ becomes /ip4/ + targetPeerAddr, _ := ma.NewMultiaddr( + fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid))) + targetAddr := ipfsaddr.Decapsulate(targetPeerAddr) - // We need to add the target to our peerstore, so we know how we can - // contact it - ha.Peerstore().AddAddr(peerid, tptmaddr, pstore.PermanentAddrTTL) + // We have a peer ID and a targetAddr so we add it to the peerstore + // so LibP2P knows how to contact it + ha.Peerstore().AddAddr(peerid, targetAddr, peerstore.PermanentAddrTTL) log.Println("opening stream") // make a new stream from host B to host A - // it should be handled on host A by the handler we set above + // it should be handled on host A by the handler we set above because + // we use the same /echo/1.0.0 protocol s, err := ha.NewStream(context.Background(), peerid, "/echo/1.0.0") if err != nil { log.Fatalln(err) } - _, err = s.Write([]byte("Hello, world!")) + _, err = s.Write([]byte("Hello, world!\n")) if err != nil { log.Fatalln(err) } @@ -142,18 +166,17 @@ func main() { log.Printf("read reply: %q\n", out) } -// doEcho reads some data from a stream, writes it back and closes the -// stream. +// doEcho reads a line of data a stream and writes it back func doEcho(s net.Stream) { - buf := make([]byte, 1024) - n, err := s.Read(buf) + buf := bufio.NewReader(s) + str, err := buf.ReadString('\n') if err != nil { log.Println(err) return } - log.Printf("read request: %q\n", buf[:n]) - _, err = s.Write(buf[:n]) + log.Printf("read: %s\n", str) + _, err = s.Write([]byte(str)) if err != nil { log.Println(err) return diff --git a/examples/http-proxy/README.md b/examples/http-proxy/README.md new file mode 100644 index 0000000000000000000000000000000000000000..858b51886a5eebc7d8b10231103cf279618a4995 --- /dev/null +++ b/examples/http-proxy/README.md @@ -0,0 +1,62 @@ +# HTTP proxy service with libp2p + +This examples shows how to create a simple HTTP proxy service with libp2p: + + +``` + XXX + XX XXXXXX + X XX + XXXXXXX XX XX XXXXXXXXXX + +---------------------+ +---------------------+ XXX XXX XXX XXX + HTTP Request | | | | XX XX ++-------------------> | libp2p stream | | HTTP X X + | Local peer <------------------> Remote peer <-------------> HTTP SERVER - THE INTERNET XX +<-------------------+ | | | Req & Resp XX X + HTTP Response | libp2p host | | libp2p host | XXXX XXXX XXXXXXXXXXXXXXXXXXXX XXXX + +---------------------+ +---------------------+ XXXXX + +``` + +In order to proxy an HTTP request, we create a local peer which listens on `localhost:9900`. HTTP requests performed to that address are tunneled via a libp2p stream to a remote peer, which then performs the HTTP requests and sends the response back to the local peer, which relays it +to the user. + +Note that this is a very simple approach to a proxy, and does not perform any header management, nor supports HTTPS. The `proxy.go` code is thoroughly commeted, detailing what is happening in every step. + +## Build + +From `go-libp2p` base folder: + +``` +> make deps +> go build ./examples/echo +``` + +## Usage + +First run the "remote" peer as follows. It will print a local peer address. If you would like to run this on a separate machine, please replace the IP accordingly: + +```sh +$ ./http-proxy +Proxy server is ready +libp2p-peer addresses: +/ip4/127.0.0.1/tcp/12000/ipfs/QmddTrQXhA9AkCpXPTkcY7e22NK73TwkUms3a44DhTKJTD +``` + +The run the local peer, indicating that it will need to forward http requests to the remote peer as follows: + +``` +$ ./http-proxy -d /ip4/127.0.0.1/tcp/12000/ipfs/QmddTrQXhA9AkCpXPTkcY7e22NK73TwkUms3a44DhTKJTD +Proxy server is ready +libp2p-peer addresses: +/ip4/127.0.0.1/tcp/12001/ipfs/Qmaa2AYTha1UqcFVX97p9R1UP7vbzDLY7bqWsZw1135QvN +proxy listening on 127.0.0.1:9900 +``` + +As you can see, the proxy prints the listening address `127.0.0.1:9900`. You can now use this address as proxy, for example with `curl`: + +``` +$ curl -x "127.0.0.1:9900" "http://ipfs.io/ipfs/QmfUX75pGRBRDnjeoMkQzuQczuCup2aYbeLxz5NzeSu9G6" +it works! + +``` diff --git a/examples/http-proxy/proxy.go b/examples/http-proxy/proxy.go new file mode 100644 index 0000000000000000000000000000000000000000..235894865a4771990431f762b4b5e2f0f9e243a6 --- /dev/null +++ b/examples/http-proxy/proxy.go @@ -0,0 +1,288 @@ +package main + +import ( + "bufio" + "context" + "flag" + "fmt" + "io" + "log" + "net/http" + "strings" + + // We need to import libp2p's libraries that we use in this project. + // In order to work, these libraries need to be rewritten by gx-go. + crypto "github.com/libp2p/go-libp2p-crypto" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + ps "github.com/libp2p/go-libp2p-peerstore" + swarm "github.com/libp2p/go-libp2p-swarm" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" + + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" +) + +// Protocol defines the libp2p protocol that we will use for the libp2p proxy +// service that we are going to provide. This will tag the streams used for +// this service. Streams are multiplexed and their protocol tag helps +// libp2p handle them to the right handler functions. +const Protocol = "/proxy-example/0.0.1" + +// makeRandomHost creates a libp2p host with a randomly generated identity. +// This step is described in depth in other tutorials. +func makeRandomHost(port int) host.Host { + priv, pub, err := crypto.GenerateKeyPair(crypto.RSA, 2048) + if err != nil { + log.Fatalln(err) + } + pid, err := peer.IDFromPublicKey(pub) + if err != nil { + log.Fatalln(err) + } + listen, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + if err != nil { + log.Fatalln(err) + } + ps := ps.NewPeerstore() + ps.AddPrivKey(pid, priv) + ps.AddPubKey(pid, pub) + n, err := swarm.NewNetwork(context.Background(), + []ma.Multiaddr{listen}, pid, ps, nil) + if err != nil { + log.Fatalln(err) + } + return bhost.New(n) +} + +// ProxyService provides HTTP proxying on top of libp2p by launching an +// HTTP server which tunnels the requests to a destination peer running +// ProxyService too. +type ProxyService struct { + host host.Host + dest peer.ID + proxyAddr ma.Multiaddr +} + +// NewProxyService attaches a proxy service to the given libp2p Host. +// The proxyAddr parameter specifies the address on which the +// HTTP proxy server listens. The dest parameter specifies the peer +// ID of the remote peer in charge of performing the HTTP requests. +// +// ProxyAddr/dest may be nil/"" it is not necessary that this host +// provides a listening HTTP server (and instead its only function is to +// perform the proxied http requests it receives from a different peer. +// +// The addresses for the dest peer should be part of the host's peerstore. +func NewProxyService(h host.Host, proxyAddr ma.Multiaddr, dest peer.ID) *ProxyService { + // We let our host know that it needs to handle streams tagged with the + // protocol id that we have defined, and then handle them to + // our own streamHandling function. + h.SetStreamHandler(Protocol, streamHandler) + + fmt.Println("Proxy server is ready") + fmt.Println("libp2p-peer addresses:") + for _, a := range h.Addrs() { + fmt.Printf("%s/ipfs/%s\n", a, peer.IDB58Encode(h.ID())) + } + + return &ProxyService{ + host: h, + dest: dest, + proxyAddr: proxyAddr, + } +} + +// streamHandler is our function to handle any libp2p-net streams that belong +// to our protocol. The streams should contain an HTTP request which we need +// to parse, make on behalf of the original node, and then write the response +// on the stream, before closing it. +func streamHandler(stream inet.Stream) { + // Remember to close the stream when we are done. + defer stream.Close() + + // Create a new buffered reader, as ReadRequest needs one. + // The buffered reader reads from our stream, on which we + // have sent the HTTP request (see ServeHTTP()) + buf := bufio.NewReader(stream) + // Read the HTTP request from the buffer + req, err := http.ReadRequest(buf) + if err != nil { + log.Println(err) + return + } + defer req.Body.Close() + + // We need to reset these fields in the request + // URL as they are not maintained. + req.URL.Scheme = "http" + hp := strings.Split(req.Host, ":") + if len(hp) > 1 && hp[1] == "443" { + req.URL.Scheme = "https" + } else { + req.URL.Scheme = "http" + } + req.URL.Host = req.Host + + outreq := new(http.Request) + *outreq = *req + + // We now make the request + fmt.Printf("Making request to %s\n", req.URL) + resp, err := http.DefaultTransport.RoundTrip(outreq) + if err != nil { + log.Println(err) + return + } + + // resp.Write writes whatever response we obtained for our + // request back to the stream. + resp.Write(stream) +} + +// Serve listens on the ProxyService's proxy address. This effectively +// allows to set the listening address as http proxy. +func (p *ProxyService) Serve() { + _, serveArgs, _ := manet.DialArgs(p.proxyAddr) + fmt.Println("proxy listening on ", serveArgs) + if p.dest != "" { + http.ListenAndServe(serveArgs, p) + } +} + +// ServeHTTP implements the http.Handler interface. WARNING: This is the +// simplest approach to a proxy. Therefore we do not do any of the things +// that should be done when implementing a reverse proxy (like handling +// headers correctly). For how to do it properly, see: +// https://golang.org/src/net/http/httputil/reverseproxy.go?s=3845:3920#L121 +// +// ServeHTTP opens a stream to the dest peer for every HTTP request. +// Streams are multiplexed over single connections so, unlike connections +// themselves, they are cheap to create and dispose of. +func (p *ProxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + fmt.Printf("proxying request for %s to peer %s\n", r.URL, p.dest.Pretty()) + // We need to send the request to the remote libp2p peer, so + // we open a stream to it + stream, err := p.host.NewStream(context.Background(), p.dest, Protocol) + // If an error happens, we write an error for response. + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer stream.Close() + + // r.Write() writes the HTTP request to the stream. + err = r.Write(stream) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + + // Now we read the response that was sent from the dest + // peer + buf := bufio.NewReader(stream) + resp, err := http.ReadResponse(buf, r) + if err != nil { + log.Println(err) + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + + // Copy any headers + for k, v := range resp.Header { + for _, s := range v { + w.Header().Add(k, s) + } + } + + // Write response status and headers + w.WriteHeader(resp.StatusCode) + + // Finally copy the body + io.Copy(w, resp.Body) + resp.Body.Close() +} + +// addAddrToPeerstore parses a peer multiaddress and adds +// it to the given host's peerstore, so it knows how to +// contact it. It returns the peer ID of the remote peer. +func addAddrToPeerstore(h host.Host, addr string) peer.ID { + // The following code extracts target's the peer ID from the + // given multiaddress + ipfsaddr, err := ma.NewMultiaddr(addr) + if err != nil { + log.Fatalln(err) + } + pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS) + if err != nil { + log.Fatalln(err) + } + + peerid, err := peer.IDB58Decode(pid) + if err != nil { + log.Fatalln(err) + } + + // Decapsulate the /ipfs/ part from the target + // /ip4//ipfs/ becomes /ip4/ + targetPeerAddr, _ := ma.NewMultiaddr( + fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid))) + targetAddr := ipfsaddr.Decapsulate(targetPeerAddr) + + // We have a peer ID and a targetAddr so we add + // it to the peerstore so LibP2P knows how to contact it + h.Peerstore().AddAddr(peerid, targetAddr, ps.PermanentAddrTTL) + return peerid +} + +func main() { + flag.Usage = func() { + fmt.Println(` +This example creates a simple HTTP Proxy using two libp2p peers. The first peer +provides an HTTP server locally which tunnels the HTTP requests with libp2p +to a remote peer. The remote peer performs the requests and +send the sends the response back. + +Usage: Start remote peer first with: ./proxy + Then start the local peer with: ./proxy -d + +Then you can do something like: curl -x "localhost:9900" "http://ipfs.io". +This proxies sends the request through the local peer, which proxies it to +the remote peer, which makes it and sends the response back. +`) + flag.PrintDefaults() + } + + // Parse some flags + destPeer := flag.String("d", "", "destination peer address") + port := flag.Int("p", 9900, "proxy port") + p2pport := flag.Int("l", 12000, "libp2p listen port") + flag.Parse() + + // If we have a destination peer we will start a local server + if *destPeer != "" { + // We use p2pport+1 in order to not collide if the user + // is running the remote peer locally on that port + host := makeRandomHost(*p2pport + 1) + // Make sure our host knows how to reach destPeer + destPeerID := addAddrToPeerstore(host, *destPeer) + proxyAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", *port)) + if err != nil { + log.Fatalln(err) + } + // Create the proxy service and start the http server + proxy := NewProxyService(host, proxyAddr, destPeerID) + proxy.Serve() // serve hangs forever + } else { + host := makeRandomHost(*p2pport) + // In this case we only need to make sure our host + // knows how to handle incoming proxied requests from + // another peer. + _ = NewProxyService(host, nil, "") + <-make(chan struct{}) // hang forever + } + +} diff --git a/examples/multicodecs/README.md b/examples/multicodecs/README.md new file mode 100644 index 0000000000000000000000000000000000000000..bd1229f4929c2bae84362741ff46a5b9046a7313 --- /dev/null +++ b/examples/multicodecs/README.md @@ -0,0 +1,39 @@ +# Using multicodecs with LibP2P + +This examples shows how to use multicodecs (i.e. json) to encode and transmit information between LibP2P hosts using LibP2P Streams. + +Multicodecs present a common interface, making it very easy to swap the codec implementation if needed. + +This example expects that you area already familiar with the [echo example](https://github.com/libp2p/go-libp2p/tree/master/examples/echo). + +## Build + +From `go-libp2p` base folder: + +``` +> make deps-examples +> go build ./examples/multicodecs +``` + +## Usage + +``` +> ./multicodecs + +``` + +## Details + +The example creates two LibP2P Hosts. Host1 opens a stream to Host2. Host2 has an `StreamHandler` to deal with the incoming stream. This is covered in the `echo` example. + +Both hosts simulate a conversation. But rather than sending raw messages on the stream, each message in the conversation is encoded under a `json` object (using the `json` multicodec). For example: + +``` +{ + "Msg": "This is the message", + "Index": 3, + "HangUp": false +} +``` + +The stream lasts until one of the sides closes it when the HangUp field is `true`. diff --git a/examples/multicodecs/main.go b/examples/multicodecs/main.go new file mode 100644 index 0000000000000000000000000000000000000000..86c069b29c61e76d924b577309dab6adc11969e3 --- /dev/null +++ b/examples/multicodecs/main.go @@ -0,0 +1,181 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "log" + "math/rand" + "time" + + crypto "github.com/libp2p/go-libp2p-crypto" + host "github.com/libp2p/go-libp2p-host" + inet "github.com/libp2p/go-libp2p-net" + peer "github.com/libp2p/go-libp2p-peer" + ps "github.com/libp2p/go-libp2p-peerstore" + swarm "github.com/libp2p/go-libp2p-swarm" + ma "github.com/multiformats/go-multiaddr" + + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + multicodec "github.com/multiformats/go-multicodec" + json "github.com/multiformats/go-multicodec/json" +) + +const proto = "/example/1.0.0" + +// Message is a serializable/encodable object that we will send +// on a Stream. +type Message struct { + Msg string + Index int + HangUp bool +} + +// streamWrap wraps a libp2p stream. We encode/decode whenever we +// write/read from a stream, so we can just carry the encoders +// and bufios with us +type WrappedStream struct { + stream inet.Stream + enc multicodec.Encoder + dec multicodec.Decoder + w *bufio.Writer + r *bufio.Reader +} + +// wrapStream takes a stream and complements it with r/w bufios and +// decoder/encoder. In order to write raw data to the stream we can use +// wrap.w.Write(). To encode something into it we can wrap.enc.Encode(). +// Finally, we should wrap.w.Flush() to actually send the data. Handling +// incoming data works similarly with wrap.r.Read() for raw-reading and +// wrap.dec.Decode() to decode. +func WrapStream(s inet.Stream) *WrappedStream { + reader := bufio.NewReader(s) + writer := bufio.NewWriter(s) + // This is where we pick our specific multicodec. In order to change the + // codec, we only need to change this place. + // See https://godoc.org/github.com/multiformats/go-multicodec/json + dec := json.Multicodec(false).Decoder(reader) + enc := json.Multicodec(false).Encoder(writer) + return &WrappedStream{ + stream: s, + r: reader, + w: writer, + enc: enc, + dec: dec, + } +} + +// messages that will be sent between the hosts. +var conversationMsgs = []string{ + "Hello!", + "Hey!", + "How are you doing?", + "Very good! It is great that you can send data on a stream to me!", + "Not only that, the data is encoded in a JSON object.", + "Yeah, and we are using the multicodecs interface to encode and decode.", + "This way we could swap it easily for, say, cbor, or msgpack!", + "Let's leave that as an excercise for the reader...", + "Agreed, our last message should activate the HangUp flag", + "Yes, and the example code will close streams. So sad :(. Bye!", +} + +func makeRandomHost(port int) host.Host { + // Ignoring most errors for brevity + // See echo example for more details and better implementation + priv, pub, _ := crypto.GenerateKeyPair(crypto.RSA, 2048) + pid, _ := peer.IDFromPublicKey(pub) + listen, _ := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", port)) + ps := ps.NewPeerstore() + ps.AddPrivKey(pid, priv) + ps.AddPubKey(pid, pub) + n, _ := swarm.NewNetwork(context.Background(), + []ma.Multiaddr{listen}, pid, ps, nil) + return bhost.New(n) +} + +func main() { + // Choose random ports between 10000-10100 + rand.Seed(666) + port1 := rand.Intn(100) + 10000 + port2 := port1 + 1 + + // Make 2 hosts + h1 := makeRandomHost(port1) + h2 := makeRandomHost(port2) + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), ps.PermanentAddrTTL) + h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), ps.PermanentAddrTTL) + + log.Printf("This is a conversation between %s and %s\n", h1.ID(), h2.ID()) + + // Define a stream handler for host number 2 + h2.SetStreamHandler(proto, func(stream inet.Stream) { + log.Printf("%s: Received a stream", h2.ID()) + wrappedStream := WrapStream(stream) + defer stream.Close() + handleStream(wrappedStream) + }) + + // Create new stream from h1 to h2 and start the conversation + stream, err := h1.NewStream(context.Background(), h2.ID(), proto) + if err != nil { + log.Fatal(err) + } + wrappedStream := WrapStream(stream) + // This sends the first message + sendMessage(0, wrappedStream) + // We keep the conversation on the created stream so we launch + // this to handle any responses + handleStream(wrappedStream) + // When we are done, close the stream on our side and exit. + stream.Close() +} + +// receiveMessage reads and decodes a message from the stream +func receiveMessage(ws *WrappedStream) (*Message, error) { + var msg Message + err := ws.dec.Decode(&msg) + if err != nil { + return nil, err + } + return &msg, nil +} + +// sendMessage encodes and writes a message to the stream +func sendMessage(index int, ws *WrappedStream) error { + msg := &Message{ + Msg: conversationMsgs[index], + Index: index, + HangUp: index >= len(conversationMsgs)-1, + } + + err := ws.enc.Encode(msg) + // Because output is buffered with bufio, we need to flush! + ws.w.Flush() + return err +} + +// handleStream is a for loop which receives and then sends a message +// an artificial delay of 500ms happens in-between. +// When Message.HangUp is true, it exists. This will close the stream +// on one of the sides. The other side's receiveMessage() will error +// with EOF, thus also breaking out from the loop. +func handleStream(ws *WrappedStream) { + for { + // Read + msg, err := receiveMessage(ws) + if err != nil { + break + } + pid := ws.stream.Conn().LocalPeer() + log.Printf("%s says: %s\n", pid, msg.Msg) + time.Sleep(500 * time.Millisecond) + if msg.HangUp { + break + } + // Send response + err = sendMessage(msg.Index+1, ws) + if err != nil { + break + } + } +}