// package swarm implements a connection muxer with a pair of channels // to synchronize all network communication. package swarm import ( "fmt" "sync" inet "github.com/jbenet/go-ipfs/p2p/net" addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" peer "github.com/jbenet/go-ipfs/p2p/peer" eventlog "github.com/jbenet/go-ipfs/util/eventlog" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ps "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream" psy "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-peerstream/transport/yamux" ) var log = eventlog.Logger("swarm2") var PSTransport = psy.DefaultTransport // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the // destination or source Peer. // // Uses peerstream.Swarm type Swarm struct { swarm *ps.Swarm local peer.ID peers peer.Peerstore connh ConnHandler // dialing is a channel for the current peers being dialed. // this way, we dont kick off N dials simultaneously. dialing map[peer.ID]chan struct{} dialingmu sync.Mutex cg ctxgroup.ContextGroup } // NewSwarm constructs a Swarm, with a Chan. func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.ID, peers peer.Peerstore) (*Swarm, error) { if len(listenAddrs) > 0 { filtered := addrutil.FilterUsableAddrs(listenAddrs) if len(filtered) < 1 { return nil, fmt.Errorf("swarm cannot use any addr in: %s", listenAddrs) } listenAddrs = filtered } s := &Swarm{ swarm: ps.NewSwarm(PSTransport), local: local, peers: peers, cg: ctxgroup.WithContext(ctx), dialing: map[peer.ID]chan struct{}{}, } // configure Swarm s.cg.SetTeardown(s.teardown) s.SetConnHandler(nil) // make sure to setup our own conn handler. return s, s.listen(listenAddrs) } func (s *Swarm) teardown() error { return s.swarm.Close() } // CtxGroup returns the Context Group of the swarm func (s *Swarm) CtxGroup() ctxgroup.ContextGroup { return s.cg } // Close stops the Swarm. func (s *Swarm) Close() error { return s.cg.Close() } // StreamSwarm returns the underlying peerstream.Swarm func (s *Swarm) StreamSwarm() *ps.Swarm { return s.swarm } // SetConnHandler assigns the handler for new connections. // See peerstream. You will rarely use this. See SetStreamHandler func (s *Swarm) SetConnHandler(handler ConnHandler) { // handler is nil if user wants to clear the old handler. if handler == nil { s.swarm.SetConnHandler(func(psconn *ps.Conn) { s.connHandler(psconn) }) return } s.swarm.SetConnHandler(func(psconn *ps.Conn) { // sc is nil if closed in our handler. if sc := s.connHandler(psconn); sc != nil { // call the user's handler. in a goroutine for sync safety. go handler(sc) } }) } // SetStreamHandler assigns the handler for new streams. // See peerstream. func (s *Swarm) SetStreamHandler(handler inet.StreamHandler) { s.swarm.SetStreamHandler(func(s *ps.Stream) { handler(wrapStream(s)) }) } // NewStreamWithPeer creates a new stream on any available connection to p func (s *Swarm) NewStreamWithPeer(p peer.ID) (*Stream, error) { // if we have no connections, try connecting. if len(s.ConnectionsToPeer(p)) == 0 { log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...") if _, err := s.Dial(context.Background(), p); err != nil { return nil, err } } log.Debug("Swarm: NewStreamWithPeer...") st, err := s.swarm.NewStreamWithGroup(p) return wrapStream(st), err } // StreamsWithPeer returns all the live Streams to p func (s *Swarm) StreamsWithPeer(p peer.ID) []*Stream { return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams())) } // ConnectionsToPeer returns all the live connections to p func (s *Swarm) ConnectionsToPeer(p peer.ID) []*Conn { return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns())) } // Connections returns a slice of all connections. func (s *Swarm) Connections() []*Conn { return wrapConns(s.swarm.Conns()) } // CloseConnection removes a given peer from swarm + closes the connection func (s *Swarm) CloseConnection(p peer.ID) error { conns := s.swarm.ConnsWithGroup(p) // boom. for _, c := range conns { c.Close() } return nil } // Peers returns a copy of the set of peers swarm is connected to. func (s *Swarm) Peers() []peer.ID { conns := s.Connections() seen := make(map[peer.ID]struct{}) peers := make([]peer.ID, 0, len(conns)) for _, c := range conns { p := c.RemotePeer() if _, found := seen[p]; found { continue } seen[p] = struct{}{} peers = append(peers, p) } return peers } // LocalPeer returns the local peer swarm is associated to. func (s *Swarm) LocalPeer() peer.ID { return s.local }