Unverified Commit 774d1987 authored by Łukasz Magiera's avatar Łukasz Magiera Committed by GitHub
Browse files

Merge pull request #68 from filecoin-project/feat/no-cgo

Build with no cgo
parents 4103afa8 aa2fbca2
...@@ -72,6 +72,8 @@ jobs: ...@@ -72,6 +72,8 @@ jobs:
key: v1-go-deps-{{ arch }}-{{ checksum "/home/circleci/project/go.mod" }} key: v1-go-deps-{{ arch }}-{{ checksum "/home/circleci/project/go.mod" }}
- run: - run:
command: make build command: make build
- run:
command: CGO_ENABLED=0 make build
test: &test test: &test
description: | description: |
......
Subproject commit bb699517a5904b3d2549ac97e2b0005ab6471dce Subproject commit 58761d3dca0f3617023240eb8b7f8b324db75b2a
...@@ -5,7 +5,7 @@ go 1.13 ...@@ -5,7 +5,7 @@ go 1.13
require ( require (
github.com/fatih/color v1.7.0 // indirect github.com/fatih/color v1.7.0 // indirect
github.com/filecoin-project/filecoin-ffi v0.0.0-20191219131535-bb699517a590 github.com/filecoin-project/filecoin-ffi v0.0.0-20191219131535-bb699517a590
github.com/filecoin-project/go-address v0.0.0-20191219011437-af739c490b4f github.com/filecoin-project/go-address v0.0.0-20200107215422-da8eea2842b5
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878
github.com/gogo/protobuf v1.3.1 // indirect github.com/gogo/protobuf v1.3.1 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
......
...@@ -13,8 +13,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c ...@@ -13,8 +13,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/filecoin-project/go-address v0.0.0-20191219011437-af739c490b4f h1:L2jaVU8TvWTx7iZPhlYvUE8vkoOnj778XuKavz8W36g= github.com/filecoin-project/go-address v0.0.0-20200107215422-da8eea2842b5 h1:/MmWluswvDIbuPvBct4q6HeQgVm62O2DzWYTB38kt4A=
github.com/filecoin-project/go-address v0.0.0-20191219011437-af739c490b4f/go.mod h1:rCbpXPva2NKF9/J4X6sr7hbKBgQCxyFtRj7KOZqoIms= github.com/filecoin-project/go-address v0.0.0-20200107215422-da8eea2842b5/go.mod h1:SAOwJoakQ8EPjwNIsiakIQKsoKdkcbx8U3IapgCg9R0=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
......
...@@ -34,6 +34,3 @@ type Verifier interface { ...@@ -34,6 +34,3 @@ type Verifier interface {
VerifyFallbackPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address, faults uint64) (bool, error) VerifyFallbackPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address, faults uint64) (bool, error)
VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error)
} }
var _ Verifier = ProofVerifier
var _ Interface = &SectorBuilder{}
//+build cgo
package sectorbuilder
import (
"context"
"io"
"os"
"sync/atomic"
ffi "github.com/filecoin-project/filecoin-ffi"
"golang.org/x/xerrors"
)
var _ Interface = &SectorBuilder{}
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
fs := sb.filesystem
if err := fs.reserve(dataStaging, sb.ssize); err != nil {
return PublicPieceInfo{}, err
}
defer fs.free(dataStaging, sb.ssize)
atomic.AddInt32(&sb.addPieceWait, 1)
ret := sb.RateLimit()
atomic.AddInt32(&sb.addPieceWait, -1)
defer ret()
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
return PublicPieceInfo{}, err
}
stagedFile, err := sb.stagedSectorFile(sectorId)
if err != nil {
return PublicPieceInfo{}, err
}
_, _, commP, err := ffi.WriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes)
if err != nil {
return PublicPieceInfo{}, err
}
if err := stagedFile.Close(); err != nil {
return PublicPieceInfo{}, err
}
if err := f.Close(); err != nil {
return PublicPieceInfo{}, err
}
return PublicPieceInfo{
Size: pieceSize,
CommP: commP,
}, werr()
}
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
fs := sb.filesystem
if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals
return nil, err
}
defer fs.free(dataUnsealed, sb.ssize)
atomic.AddInt32(&sb.unsealWait, 1)
// TODO: Don't wait if cached
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
defer ret()
atomic.AddInt32(&sb.unsealWait, -1)
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Unlock()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return nil, err
}
unsealedPath := sb.unsealedSectorPath(sectorID)
// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
var commd [CommLen]byte
copy(commd[:], commD)
var tkt [CommLen]byte
copy(tkt[:], ticket)
err = ffi.Unseal(sb.ssize,
PoRepProofPartitions,
cacheDir,
sealedPath,
unsealedPath,
sectorID,
addressToProverID(sb.Miner),
tkt,
commd)
if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
}
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}
func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
fs := sb.filesystem
if err := fs.reserve(dataCache, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataCache, sb.ssize)
if err := fs.reserve(dataSealed, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataSealed, sb.ssize)
call := workerCall{
task: WorkerTask{
Type: WorkerPreCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.preCommitWait, 1)
select { // prefer remote
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
default:
}
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noPreCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
case rl <- struct{}{}:
case <-ctx.Done():
return RawSealPreCommitOutput{}, ctx.Err()
}
atomic.AddInt32(&sb.preCommitWait, -1)
// local
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err)
}
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err)
}
e, err := os.OpenFile(sealedPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return RawSealPreCommitOutput{}, err
}
var sum uint64
for _, piece := range pieces {
sum += piece.Size
}
ussize := UserBytesForSectorSize(sb.ssize)
if sum != ussize {
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
stagedPath := sb.StagedSectorPath(sectorID)
// TODO: context cancellation respect
rspco, err := ffi.SealPreCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
stagedPath,
sealedPath,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
pieces,
)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
return RawSealPreCommitOutput(rspco), nil
}
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
proof, err = ffi.SealCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
seed.TicketBytes,
pieces,
ffi.RawSealPreCommitOutput(rspco),
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco)
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) SealCommit(ctx context.Context, sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
call := workerCall{
task: WorkerTask{
Type: WorkerCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
SealSeed: seed,
Rspco: rspco,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.commitWait, 1)
select { // prefer remote
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
case <-ctx.Done():
return nil, ctx.Err()
}
}
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) {
if len(challengeSeed) != CommLen {
return nil, xerrors.Errorf("given challenge seed was the wrong length: %d != %d", len(challengeSeed), CommLen)
}
var cseed [CommLen]byte
copy(cseed[:], challengeSeed)
privsects, err := sb.pubSectorToPriv(sectorInfo, nil) // TODO: faults
if err != nil {
return nil, err
}
proverID := addressToProverID(sb.Miner)
return ffi.GeneratePoSt(sb.ssize, proverID, privsects, cseed, winners)
}
func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo, faults)
if err != nil {
return nil, err
}
challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo.Values())), uint64(len(faults)))
proverID := addressToProverID(sb.Miner)
return ffi.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
}
func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo, faults)
if err != nil {
return nil, nil, err
}
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())), uint64(len(faults)))
proverID := addressToProverID(sb.Miner)
candidates, err := ffi.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
if err != nil {
return nil, nil, err
}
proof, err := ffi.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates)
return candidates, proof, err
}
package sectorbuilder package sectorbuilder
import ( import (
"context"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
sectorbuilder "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
datastore "github.com/ipfs/go-datastore" datastore "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
...@@ -26,72 +23,6 @@ var lastSectorIdKey = datastore.NewKey("/last") ...@@ -26,72 +23,6 @@ var lastSectorIdKey = datastore.NewKey("/last")
var log = logging.Logger("sectorbuilder") var log = logging.Logger("sectorbuilder")
type SortedPublicSectorInfo = sectorbuilder.SortedPublicSectorInfo
type SortedPrivateSectorInfo = sectorbuilder.SortedPrivateSectorInfo
type SealTicket = sectorbuilder.SealTicket
type SealSeed = sectorbuilder.SealSeed
type SealPreCommitOutput = sectorbuilder.SealPreCommitOutput
type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput
type EPostCandidate = sectorbuilder.Candidate
const CommLen = sectorbuilder.CommitmentBytesLen
type WorkerCfg struct {
NoPreCommit bool
NoCommit bool
// TODO: 'cost' info, probably in terms of sealing + transfer speed
}
type SectorBuilder struct {
ds datastore.Batching
idLk sync.Mutex
ssize uint64
lastID uint64
Miner address.Address
unsealLk sync.Mutex
noCommit bool
noPreCommit bool
rateLimit chan struct{}
precommitTasks chan workerCall
commitTasks chan workerCall
taskCtr uint64
remoteLk sync.Mutex
remoteCtr int
remotes map[int]*remote
remoteResults map[uint64]chan<- SealRes
addPieceWait int32
preCommitWait int32
commitWait int32
unsealWait int32
fsLk sync.Mutex //nolint: struckcheck
filesystem *fs // TODO: multi-fs support
stopping chan struct{}
}
type JsonRSPCO struct {
CommD []byte
CommR []byte
}
func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO { func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO {
return JsonRSPCO{ return JsonRSPCO{
CommD: rspco.CommD[:], CommD: rspco.CommD[:],
...@@ -106,21 +37,6 @@ func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { ...@@ -106,21 +37,6 @@ func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput {
return out return out
} }
type SealRes struct {
Err string
GoErr error `json:"-"`
Proof []byte
Rspco JsonRSPCO
}
type remote struct {
lk sync.Mutex
sealTasks chan<- WorkerTask
busy uint64 // only for metrics
}
type Config struct { type Config struct {
SectorSize uint64 SectorSize uint64
Miner address.Address Miner address.Address
...@@ -289,126 +205,6 @@ func (sb *SectorBuilder) AcquireSectorId() (uint64, error) { ...@@ -289,126 +205,6 @@ func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
return id, nil return id, nil
} }
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
fs := sb.filesystem
if err := fs.reserve(dataStaging, sb.ssize); err != nil {
return PublicPieceInfo{}, err
}
defer fs.free(dataStaging, sb.ssize)
atomic.AddInt32(&sb.addPieceWait, 1)
ret := sb.RateLimit()
atomic.AddInt32(&sb.addPieceWait, -1)
defer ret()
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
return PublicPieceInfo{}, err
}
stagedFile, err := sb.stagedSectorFile(sectorId)
if err != nil {
return PublicPieceInfo{}, err
}
_, _, commP, err := sectorbuilder.WriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes)
if err != nil {
return PublicPieceInfo{}, err
}
if err := stagedFile.Close(); err != nil {
return PublicPieceInfo{}, err
}
if err := f.Close(); err != nil {
return PublicPieceInfo{}, err
}
return PublicPieceInfo{
Size: pieceSize,
CommP: commP,
}, werr()
}
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
fs := sb.filesystem
if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals
return nil, err
}
defer fs.free(dataUnsealed, sb.ssize)
atomic.AddInt32(&sb.unsealWait, 1)
// TODO: Don't wait if cached
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
defer ret()
atomic.AddInt32(&sb.unsealWait, -1)
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Unlock()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return nil, err
}
unsealedPath := sb.unsealedSectorPath(sectorID)
// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
var commd [CommLen]byte
copy(commd[:], commD)
var tkt [CommLen]byte
copy(tkt[:], ticket)
err = sectorbuilder.Unseal(sb.ssize,
PoRepProofPartitions,
cacheDir,
sealedPath,
unsealedPath,
sectorID,
addressToProverID(sb.Miner),
tkt,
commd)
if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
}
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
atomic.AddInt32(&sb.preCommitWait, -1) atomic.AddInt32(&sb.preCommitWait, -1)
...@@ -424,109 +220,6 @@ func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitO ...@@ -424,109 +220,6 @@ func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitO
} }
} }
func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
fs := sb.filesystem
if err := fs.reserve(dataCache, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataCache, sb.ssize)
if err := fs.reserve(dataSealed, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataSealed, sb.ssize)
call := workerCall{
task: WorkerTask{
Type: WorkerPreCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.preCommitWait, 1)
select { // prefer remote
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
default:
}
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noPreCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
case rl <- struct{}{}:
case <-ctx.Done():
return RawSealPreCommitOutput{}, ctx.Err()
}
atomic.AddInt32(&sb.preCommitWait, -1)
// local
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err)
}
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err)
}
e, err := os.OpenFile(sealedPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return RawSealPreCommitOutput{}, err
}
var sum uint64
for _, piece := range pieces {
sum += piece.Size
}
ussize := UserBytesForSectorSize(sb.ssize)
if sum != ussize {
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
stagedPath := sb.StagedSectorPath(sectorID)
// TODO: context cancellation respect
rspco, err := sectorbuilder.SealPreCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
stagedPath,
sealedPath,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
pieces,
)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
return RawSealPreCommitOutput(rspco), nil
}
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1) atomic.AddInt32(&sb.commitWait, -1)
...@@ -541,119 +234,13 @@ func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err er ...@@ -541,119 +234,13 @@ func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err er
} }
} }
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
proof, err = sectorbuilder.SealCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
seed.TicketBytes,
pieces,
sectorbuilder.RawSealPreCommitOutput(rspco),
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco)
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) SealCommit(ctx context.Context, sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
call := workerCall{
task: WorkerTask{
Type: WorkerCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
SealSeed: seed,
Rspco: rspco,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.commitWait, 1)
select { // prefer remote
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
case <-ctx.Done():
return nil, ctx.Err()
}
}
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) {
if len(challengeSeed) != CommLen {
return nil, xerrors.Errorf("given challenge seed was the wrong length: %d != %d", len(challengeSeed), CommLen)
}
var cseed [CommLen]byte
copy(cseed[:], challengeSeed)
privsects, err := sb.pubSectorToPriv(sectorInfo, nil) // TODO: faults
if err != nil {
return nil, err
}
proverID := addressToProverID(sb.Miner)
return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsects, cseed, winners)
}
func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo, faults)
if err != nil {
return nil, err
}
challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo.Values())), uint64(len(faults)))
proverID := addressToProverID(sb.Miner)
return sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
}
func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faults []uint64) (SortedPrivateSectorInfo, error) { func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faults []uint64) (SortedPrivateSectorInfo, error) {
fmap := map[uint64]struct{}{} fmap := map[uint64]struct{}{}
for _, fault := range faults { for _, fault := range faults {
fmap[fault] = struct{}{} fmap[fault] = struct{}{}
} }
var out []sectorbuilder.PrivateSectorInfo var out []ffi.PrivateSectorInfo
for _, s := range sectorInfo.Values() { for _, s := range sectorInfo.Values() {
if _, faulty := fmap[s.SectorID]; faulty { if _, faulty := fmap[s.SectorID]; faulty {
continue continue
...@@ -669,36 +256,14 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faul ...@@ -669,36 +256,14 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faul
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err) return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err)
} }
out = append(out, sectorbuilder.PrivateSectorInfo{ out = append(out, ffi.PrivateSectorInfo{
SectorID: s.SectorID, SectorID: s.SectorID,
CommR: s.CommR, CommR: s.CommR,
CacheDirPath: cachePath, CacheDirPath: cachePath,
SealedSectorPath: sealedPath, SealedSectorPath: sealedPath,
}) })
} }
return newSortedPrivateSectorInfo(out), nil return ffi.NewSortedPrivateSectorInfo(out...), nil
}
func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo, faults)
if err != nil {
return nil, nil, err
}
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())), uint64(len(faults)))
proverID := addressToProverID(sb.Miner)
candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
if err != nil {
return nil, nil, err
}
proof, err := sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates)
return candidates, proof, err
}
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
} }
func fallbackPostChallengeCount(sectors uint64, faults uint64) uint64 { func fallbackPostChallengeCount(sectors uint64, faults uint64) uint64 {
...@@ -803,3 +368,7 @@ func migrateFile(from, to string, symlink bool) error { ...@@ -803,3 +368,7 @@ func migrateFile(from, to string, symlink bool) error {
return dcopy.Copy(from, to) return dcopy.Copy(from, to)
} }
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
}
//+build cgo
package sectorbuilder package sectorbuilder
import ( import (
...@@ -10,11 +12,14 @@ import ( ...@@ -10,11 +12,14 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
) )
var _ Verifier = ProofVerifier
func (sb *SectorBuilder) SectorSize() uint64 { func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize return sb.ssize
} }
type proofVerifier struct {} type proofVerifier struct{}
var ProofVerifier = proofVerifier{} var ProofVerifier = proofVerifier{}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
......
package sectorbuilder
import (
"sync"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-datastore"
)
type SortedPublicSectorInfo = ffi.SortedPublicSectorInfo
type SortedPrivateSectorInfo = ffi.SortedPrivateSectorInfo
type SealTicket = ffi.SealTicket
type SealSeed = ffi.SealSeed
type SealPreCommitOutput = ffi.SealPreCommitOutput
type SealCommitOutput = ffi.SealCommitOutput
type PublicPieceInfo = ffi.PublicPieceInfo
type RawSealPreCommitOutput ffi.RawSealPreCommitOutput
type EPostCandidate = ffi.Candidate
const CommLen = ffi.CommitmentBytesLen
type WorkerCfg struct {
NoPreCommit bool
NoCommit bool
// TODO: 'cost' info, probably in terms of sealing + transfer speed
}
type SectorBuilder struct {
ds datastore.Batching
idLk sync.Mutex
ssize uint64
lastID uint64
Miner address.Address
unsealLk sync.Mutex
noCommit bool
noPreCommit bool
rateLimit chan struct{}
precommitTasks chan workerCall
commitTasks chan workerCall
taskCtr uint64
remoteLk sync.Mutex
remoteCtr int
remotes map[int]*remote
remoteResults map[uint64]chan<- SealRes
addPieceWait int32
preCommitWait int32
commitWait int32
unsealWait int32
fsLk sync.Mutex //nolint: struckcheck
filesystem *fs // TODO: multi-fs support
stopping chan struct{}
}
type remote struct {
lk sync.Mutex
sealTasks chan<- WorkerTask
busy uint64 // only for metrics
}
type JsonRSPCO struct {
CommD []byte
CommR []byte
}
type SealRes struct {
Err string
GoErr error `json:"-"`
Proof []byte
Rspco JsonRSPCO
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment