sync.go 1.65 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
package queue

import (
Jeromy's avatar
Jeromy committed
4
	logging "QmWRypnfEwrgH4k93KEHN5hng7VjKYkWmzDYRuTZeh2Mgh/go-log"
Jeromy's avatar
Jeromy committed
5
6
	context "QmacZi9WygGK7Me8mH53pypyscHzU386aUZXpr28GZgUct/context"
	peer "github.com/ipfs/go-libp2p/p2p/peer"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
7
8
)

Jeromy's avatar
Jeromy committed
9
var log = logging.Logger("peerqueue")
10

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
	Queue   PeerQueue
	EnqChan chan<- peer.ID
	DeqChan <-chan peer.ID
}

// NewChanQueue creates a ChanQueue by wrapping pq.
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
	cq := &ChanQueue{Queue: pq}
	cq.process(ctx)
	return cq
}

func (cq *ChanQueue) process(ctx context.Context) {
	// construct the channels here to be able to use them bidirectionally
	enqChan := make(chan peer.ID)
	deqChan := make(chan peer.ID)

	cq.EnqChan = enqChan
	cq.DeqChan = deqChan

	go func() {
34
35
		log.Debug("processing")
		defer log.Debug("closed")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
36
37
38
39
40
41
42
43
		defer close(deqChan)

		var next peer.ID
		var item peer.ID
		var more bool

		for {
			if cq.Queue.Len() == 0 {
44
				// log.Debug("wait for enqueue")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
45
46
47
48
49
				select {
				case next, more = <-enqChan:
					if !more {
						return
					}
50
					// log.Debug("got", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
51
52
53
54
55
56
57

				case <-ctx.Done():
					return
				}

			} else {
				next = cq.Queue.Dequeue()
58
				// log.Debug("peek", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
59
60
61
62
63
			}

			select {
			case item, more = <-enqChan:
				if !more {
64
65
66
67
					if cq.Queue.Len() > 0 {
						return // we're done done.
					}
					enqChan = nil // closed, so no use.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
68
				}
69
				// log.Debug("got", item)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
70
				cq.Queue.Enqueue(item)
71
				cq.Queue.Enqueue(next) // order may have changed.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
72
73
74
				next = ""

			case deqChan <- next:
75
				// log.Debug("dequeued", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
76
77
78
79
80
81
82
83
84
				next = ""

			case <-ctx.Done():
				return
			}
		}

	}()
}