Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
adam.huang
go-libp2p
Commits
3b0da8a3
Commit
3b0da8a3
authored
Jan 05, 2015
by
Juan Batiz-Benet
Browse files
peer/queue: close fix, and logging
parent
d03fad69
Changes
1
Show whitespace changes
Inline
Side-by-side
peer/queue/sync.go
View file @
3b0da8a3
...
@@ -4,8 +4,11 @@ import (
...
@@ -4,8 +4,11 @@ import (
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
context
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer
"github.com/jbenet/go-ipfs/p2p/peer"
peer
"github.com/jbenet/go-ipfs/p2p/peer"
eventlog
"github.com/jbenet/go-ipfs/util/eventlog"
)
)
var
log
=
eventlog
.
Logger
(
"peerqueue"
)
// ChanQueue makes any PeerQueue synchronizable through channels.
// ChanQueue makes any PeerQueue synchronizable through channels.
type
ChanQueue
struct
{
type
ChanQueue
struct
{
Queue
PeerQueue
Queue
PeerQueue
...
@@ -21,6 +24,7 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
...
@@ -21,6 +24,7 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
}
}
func
(
cq
*
ChanQueue
)
process
(
ctx
context
.
Context
)
{
func
(
cq
*
ChanQueue
)
process
(
ctx
context
.
Context
)
{
log
:=
log
.
Prefix
(
"<ChanQueue %p>"
,
cq
)
// construct the channels here to be able to use them bidirectionally
// construct the channels here to be able to use them bidirectionally
enqChan
:=
make
(
chan
peer
.
ID
)
enqChan
:=
make
(
chan
peer
.
ID
)
...
@@ -30,6 +34,8 @@ func (cq *ChanQueue) process(ctx context.Context) {
...
@@ -30,6 +34,8 @@ func (cq *ChanQueue) process(ctx context.Context) {
cq
.
DeqChan
=
deqChan
cq
.
DeqChan
=
deqChan
go
func
()
{
go
func
()
{
log
.
Debug
(
"processing"
)
defer
log
.
Debug
(
"closed"
)
defer
close
(
deqChan
)
defer
close
(
deqChan
)
var
next
peer
.
ID
var
next
peer
.
ID
...
@@ -38,11 +44,13 @@ func (cq *ChanQueue) process(ctx context.Context) {
...
@@ -38,11 +44,13 @@ func (cq *ChanQueue) process(ctx context.Context) {
for
{
for
{
if
cq
.
Queue
.
Len
()
==
0
{
if
cq
.
Queue
.
Len
()
==
0
{
// log.Debug("wait for enqueue")
select
{
select
{
case
next
,
more
=
<-
enqChan
:
case
next
,
more
=
<-
enqChan
:
if
!
more
{
if
!
more
{
return
return
}
}
// log.Debug("got", next)
case
<-
ctx
.
Done
()
:
case
<-
ctx
.
Done
()
:
return
return
...
@@ -50,19 +58,24 @@ func (cq *ChanQueue) process(ctx context.Context) {
...
@@ -50,19 +58,24 @@ func (cq *ChanQueue) process(ctx context.Context) {
}
else
{
}
else
{
next
=
cq
.
Queue
.
Dequeue
()
next
=
cq
.
Queue
.
Dequeue
()
// log.Debug("peek", next)
}
}
select
{
select
{
case
item
,
more
=
<-
enqChan
:
case
item
,
more
=
<-
enqChan
:
if
!
more
{
if
!
more
{
return
if
cq
.
Queue
.
Len
()
>
0
{
return
// we're done done.
}
}
enqChan
=
nil
// closed, so no use.
}
// log.Debug("got", item)
cq
.
Queue
.
Enqueue
(
item
)
cq
.
Queue
.
Enqueue
(
item
)
cq
.
Queue
.
Enqueue
(
next
)
cq
.
Queue
.
Enqueue
(
next
)
// order may have changed.
next
=
""
next
=
""
case
deqChan
<-
next
:
case
deqChan
<-
next
:
// log.Debug("dequeued", next)
next
=
""
next
=
""
case
<-
ctx
.
Done
()
:
case
<-
ctx
.
Done
()
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment