mdns.go 3.77 KB
Newer Older
1
2
3
package discovery

import (
Jeromy's avatar
Jeromy committed
4
	"context"
Jeromy's avatar
Jeromy committed
5
	"errors"
6
7
8
9
10
11
12
	"io"
	"io/ioutil"
	golog "log"
	"net"
	"sync"
	"time"

Jeromy's avatar
Jeromy committed
13
	"github.com/ipfs/go-libp2p-peer"
Jeromy's avatar
Jeromy committed
14
	pstore "github.com/ipfs/go-libp2p-peerstore"
Jeromy's avatar
Jeromy committed
15
16
17
	logging "github.com/ipfs/go-log"
	ma "github.com/jbenet/go-multiaddr"
	manet "github.com/jbenet/go-multiaddr-net"
18
	"github.com/libp2p/go-libp2p/p2p/host"
19
	"github.com/whyrusleeping/mdns"
20
21
)

Jeromy's avatar
Jeromy committed
22
var log = logging.Logger("mdns")
23

24
const ServiceTag = "_ipfs-discovery._udp"
25
26
27
28
29
30
31
32

type Service interface {
	io.Closer
	RegisterNotifee(Notifee)
	UnregisterNotifee(Notifee)
}

type Notifee interface {
Jeromy's avatar
Jeromy committed
33
	HandlePeerFound(pstore.PeerInfo)
34
35
36
37
38
39
40
41
42
}

type mdnsService struct {
	server  *mdns.Server
	service *mdns.MDNSService
	host    host.Host

	lk       sync.Mutex
	notifees []Notifee
Jeromy's avatar
Jeromy committed
43
	interval time.Duration
44
45
}

Jeromy's avatar
Jeromy committed
46
47
func getDialableListenAddrs(ph host.Host) ([]*net.TCPAddr, error) {
	var out []*net.TCPAddr
Jeromy's avatar
Jeromy committed
48
49
50
51
52
53
54
	for _, addr := range ph.Addrs() {
		na, err := manet.ToNetAddr(addr)
		if err != nil {
			continue
		}
		tcp, ok := na.(*net.TCPAddr)
		if ok {
Jeromy's avatar
Jeromy committed
55
			out = append(out, tcp)
Jeromy's avatar
Jeromy committed
56
57
		}
	}
Jeromy's avatar
Jeromy committed
58
59
60
61
	if len(out) == 0 {
		return nil, errors.New("failed to find good external addr from peerhost")
	}
	return out, nil
Jeromy's avatar
Jeromy committed
62
63
}

Jeromy's avatar
Jeromy committed
64
func NewMdnsService(ctx context.Context, peerhost host.Host, interval time.Duration) (Service, error) {
65
66
67
68

	// TODO: dont let mdns use logging...
	golog.SetOutput(ioutil.Discard)

Jeromy's avatar
Jeromy committed
69
	var ipaddrs []net.IP
70
	port := 4001
Jeromy's avatar
Jeromy committed
71

Jeromy's avatar
Jeromy committed
72
	addrs, err := getDialableListenAddrs(peerhost)
Jeromy's avatar
Jeromy committed
73
74
75
	if err != nil {
		log.Warning(err)
	} else {
Jeromy's avatar
Jeromy committed
76
77
78
79
		port = addrs[0].Port
		for _, a := range addrs {
			ipaddrs = append(ipaddrs, a.IP)
		}
80
	}
Jeromy's avatar
Jeromy committed
81

82
83
84
	myid := peerhost.ID().Pretty()

	info := []string{myid}
Jeromy's avatar
Jeromy committed
85
	service, err := mdns.NewMDNSService(myid, ServiceTag, "", "", port, ipaddrs, info)
86
87
88
89
90
91
92
93
94
95
96
	if err != nil {
		return nil, err
	}

	// Create the mDNS server, defer shutdown
	server, err := mdns.NewServer(&mdns.Config{Zone: service})
	if err != nil {
		return nil, err
	}

	s := &mdnsService{
Jeromy's avatar
Jeromy committed
97
98
99
100
		server:   server,
		service:  service,
		host:     peerhost,
		interval: interval,
101
102
	}

Jeromy's avatar
Jeromy committed
103
	go s.pollForEntries(ctx)
104
105
106
107
108
109
110
111

	return s, nil
}

func (m *mdnsService) Close() error {
	return m.server.Shutdown()
}

Jeromy's avatar
Jeromy committed
112
113
func (m *mdnsService) pollForEntries(ctx context.Context) {

Jeromy's avatar
Jeromy committed
114
	ticker := time.NewTicker(m.interval)
Jeromy's avatar
Jeromy committed
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
	for {
		select {
		case <-ticker.C:
			entriesCh := make(chan *mdns.ServiceEntry, 16)
			go func() {
				for entry := range entriesCh {
					m.handleEntry(entry)
				}
			}()

			log.Debug("starting mdns query")
			qp := &mdns.QueryParam{
				Domain:  "local",
				Entries: entriesCh,
				Service: ServiceTag,
				Timeout: time.Second * 5,
131
			}
John Steidley's avatar
John Steidley committed
132

Jeromy's avatar
Jeromy committed
133
134
135
136
137
138
139
140
141
			err := mdns.Query(qp)
			if err != nil {
				log.Error("mdns lookup error: ", err)
			}
			close(entriesCh)
			log.Debug("mdns query complete")
		case <-ctx.Done():
			log.Debug("mdns service halting")
			return
142
143
144
145
146
		}
	}
}

func (m *mdnsService) handleEntry(e *mdns.ServiceEntry) {
Jeromy's avatar
Jeromy committed
147
	log.Debugf("Handling MDNS entry: %s:%d %s", e.AddrV4, e.Port, e.Info)
148
149
150
151
152
153
154
	mpeer, err := peer.IDB58Decode(e.Info)
	if err != nil {
		log.Warning("Error parsing peer ID from mdns entry: ", err)
		return
	}

	if mpeer == m.host.ID() {
Jeromy's avatar
Jeromy committed
155
		log.Debug("got our own mdns entry, skipping")
156
157
158
159
160
161
162
163
164
165
166
167
		return
	}

	maddr, err := manet.FromNetAddr(&net.TCPAddr{
		IP:   e.AddrV4,
		Port: e.Port,
	})
	if err != nil {
		log.Warning("Error parsing multiaddr from mdns entry: ", err)
		return
	}

Jeromy's avatar
Jeromy committed
168
	pi := pstore.PeerInfo{
169
170
171
172
173
174
		ID:    mpeer,
		Addrs: []ma.Multiaddr{maddr},
	}

	m.lk.Lock()
	for _, n := range m.notifees {
Jeromy's avatar
Jeromy committed
175
		go n.HandlePeerFound(pi)
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
	}
	m.lk.Unlock()
}

func (m *mdnsService) RegisterNotifee(n Notifee) {
	m.lk.Lock()
	m.notifees = append(m.notifees, n)
	m.lk.Unlock()
}

func (m *mdnsService) UnregisterNotifee(n Notifee) {
	m.lk.Lock()
	found := -1
	for i, notif := range m.notifees {
		if notif == n {
			found = i
			break
		}
	}
	if found != -1 {
		m.notifees = append(m.notifees[:found], m.notifees[found+1:]...)
	}
	m.lk.Unlock()
}