sync.go 1.71 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
6
package queue

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	peer "github.com/jbenet/go-ipfs/p2p/peer"
7
	eventlog "github.com/jbenet/go-ipfs/util/eventlog"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
9
)

10
11
var log = eventlog.Logger("peerqueue")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
	Queue   PeerQueue
	EnqChan chan<- peer.ID
	DeqChan <-chan peer.ID
}

// NewChanQueue creates a ChanQueue by wrapping pq.
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
	cq := &ChanQueue{Queue: pq}
	cq.process(ctx)
	return cq
}

func (cq *ChanQueue) process(ctx context.Context) {
27
	log := log.Prefix("<ChanQueue %p>", cq)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
28
29
30
31
32
33
34
35
36

	// construct the channels here to be able to use them bidirectionally
	enqChan := make(chan peer.ID)
	deqChan := make(chan peer.ID)

	cq.EnqChan = enqChan
	cq.DeqChan = deqChan

	go func() {
37
38
		log.Debug("processing")
		defer log.Debug("closed")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
39
40
41
42
43
44
45
46
		defer close(deqChan)

		var next peer.ID
		var item peer.ID
		var more bool

		for {
			if cq.Queue.Len() == 0 {
47
				// log.Debug("wait for enqueue")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
48
49
50
51
52
				select {
				case next, more = <-enqChan:
					if !more {
						return
					}
53
					// log.Debug("got", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
54
55
56
57
58
59
60

				case <-ctx.Done():
					return
				}

			} else {
				next = cq.Queue.Dequeue()
61
				// log.Debug("peek", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
62
63
64
65
66
			}

			select {
			case item, more = <-enqChan:
				if !more {
67
68
69
70
					if cq.Queue.Len() > 0 {
						return // we're done done.
					}
					enqChan = nil // closed, so no use.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
				}
72
				// log.Debug("got", item)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
				cq.Queue.Enqueue(item)
74
				cq.Queue.Enqueue(next) // order may have changed.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
75
76
77
				next = ""

			case deqChan <- next:
78
				// log.Debug("dequeued", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
79
80
81
82
83
84
85
86
87
				next = ""

			case <-ctx.Done():
				return
			}
		}

	}()
}