sync.go 1.67 KB
Newer Older
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
1
2
3
4
5
6
package queue

import (
	context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"

	peer "github.com/jbenet/go-ipfs/p2p/peer"
7
	eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
8
9
)

10
11
var log = eventlog.Logger("peerqueue")

Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
	Queue   PeerQueue
	EnqChan chan<- peer.ID
	DeqChan <-chan peer.ID
}

// NewChanQueue creates a ChanQueue by wrapping pq.
func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
	cq := &ChanQueue{Queue: pq}
	cq.process(ctx)
	return cq
}

func (cq *ChanQueue) process(ctx context.Context) {
	// construct the channels here to be able to use them bidirectionally
	enqChan := make(chan peer.ID)
	deqChan := make(chan peer.ID)

	cq.EnqChan = enqChan
	cq.DeqChan = deqChan

	go func() {
35
36
		log.Debug("processing")
		defer log.Debug("closed")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
37
38
39
40
41
42
43
44
		defer close(deqChan)

		var next peer.ID
		var item peer.ID
		var more bool

		for {
			if cq.Queue.Len() == 0 {
45
				// log.Debug("wait for enqueue")
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
46
47
48
49
50
				select {
				case next, more = <-enqChan:
					if !more {
						return
					}
51
					// log.Debug("got", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
52
53
54
55
56
57
58

				case <-ctx.Done():
					return
				}

			} else {
				next = cq.Queue.Dequeue()
59
				// log.Debug("peek", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
60
61
62
63
64
			}

			select {
			case item, more = <-enqChan:
				if !more {
65
66
67
68
					if cq.Queue.Len() > 0 {
						return // we're done done.
					}
					enqChan = nil // closed, so no use.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
69
				}
70
				// log.Debug("got", item)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
71
				cq.Queue.Enqueue(item)
72
				cq.Queue.Enqueue(next) // order may have changed.
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
73
74
75
				next = ""

			case deqChan <- next:
76
				// log.Debug("dequeued", next)
Juan Batiz-Benet's avatar
Juan Batiz-Benet committed
77
78
79
80
81
82
83
84
85
				next = ""

			case <-ctx.Done():
				return
			}
		}

	}()
}