Commit 9be03ea0 authored by whyrusleeping's avatar whyrusleeping
Browse files

add contexts to commit and precommit

parent 4a500030
package sectorbuilder package sectorbuilder
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
...@@ -11,12 +12,11 @@ import ( ...@@ -11,12 +12,11 @@ import (
"sync/atomic" "sync/atomic"
sectorbuilder "github.com/filecoin-project/filecoin-ffi" sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"github.com/ipfs/go-datastore" "github.com/filecoin-project/go-address"
datastore "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
dcopy "github.com/otiai10/copy" dcopy "github.com/otiai10/copy"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
) )
const PoStReservedWorkers = 1 const PoStReservedWorkers = 1
...@@ -424,7 +424,7 @@ func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitO ...@@ -424,7 +424,7 @@ func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitO
} }
} }
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
fs := sb.filesystem fs := sb.filesystem
if err := fs.reserve(dataCache, sb.ssize); err != nil { if err := fs.reserve(dataCache, sb.ssize); err != nil {
...@@ -467,6 +467,8 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece ...@@ -467,6 +467,8 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
case sb.precommitTasks <- call: case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call) return sb.sealPreCommitRemote(call)
case rl <- struct{}{}: case rl <- struct{}{}:
case <-ctx.Done():
return RawSealPreCommitOutput{}, ctx.Err()
} }
atomic.AddInt32(&sb.preCommitWait, -1) atomic.AddInt32(&sb.preCommitWait, -1)
...@@ -506,6 +508,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece ...@@ -506,6 +508,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
stagedPath := sb.StagedSectorPath(sectorID) stagedPath := sb.StagedSectorPath(sectorID)
// TODO: context cancellation respect
rspco, err := sectorbuilder.SealPreCommit( rspco, err := sectorbuilder.SealPreCommit(
sb.ssize, sb.ssize,
PoRepProofPartitions, PoRepProofPartitions,
...@@ -571,7 +574,7 @@ func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, see ...@@ -571,7 +574,7 @@ func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, see
return proof, nil return proof, nil
} }
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { func (sb *SectorBuilder) SealCommit(ctx context.Context, sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
call := workerCall{ call := workerCall{
task: WorkerTask{ task: WorkerTask{
Type: WorkerCommit, Type: WorkerCommit,
...@@ -604,6 +607,8 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea ...@@ -604,6 +607,8 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea
proof, err = sb.sealCommitRemote(call) proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}: case rl <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco) proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
case <-ctx.Done():
return nil, ctx.Err()
} }
} }
if err != nil { if err != nil {
......
...@@ -53,7 +53,7 @@ func (s *seal) precommit(t *testing.T, sb *sectorbuilder.SectorBuilder, sid uint ...@@ -53,7 +53,7 @@ func (s *seal) precommit(t *testing.T, sb *sectorbuilder.SectorBuilder, sid uint
TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2}, TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2},
} }
s.pco, err = sb.SealPreCommit(sid, s.ticket, []sectorbuilder.PublicPieceInfo{s.ppi}) s.pco, err = sb.SealPreCommit(context.TODO(), sid, s.ticket, []sectorbuilder.PublicPieceInfo{s.ppi})
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
...@@ -67,7 +67,7 @@ func (s *seal) commit(t *testing.T, sb *sectorbuilder.SectorBuilder, done func() ...@@ -67,7 +67,7 @@ func (s *seal) commit(t *testing.T, sb *sectorbuilder.SectorBuilder, done func()
TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}, TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9},
} }
proof, err := sb.SealCommit(s.sid, s.ticket, seed, []sectorbuilder.PublicPieceInfo{s.ppi}, s.pco) proof, err := sb.SealCommit(context.TODO(), s.sid, s.ticket, seed, []sectorbuilder.PublicPieceInfo{s.ppi}, s.pco)
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment