Unverified Commit 82965a74 authored by Erin Swenson-Healey's avatar Erin Swenson-Healey Committed by GitHub
Browse files

replace old go-sectorbuilder with lotus' sectorbuilder + paramfetch (#61)



* deals: Sending initial proposal works

* deals: Almost sealing client data

* deals: Use temp files for AddPiece

* deals: Upstream bitswap changes

* pond: Basic message display in Block window

* move poller to sector store

* sectorstore: Address review feetback

* storageminer: Initial PaymentVerify implementation

* Wire up more proper ticket generation and verification logic

* Replace most marshaling with codegen

* Command to list sealed blocks

* update sectorbuilder

* Import proofs for paramfetch

* Extract go-fil-proofs

* Fix sectorbuilder poRepProofPartitions

* retrieval: Make types more spec complaiant

* Simpler paramfetch

* Merge commit 'c57c47ffb5695f7536306c4f3ab05c9a98adb1c6' as 'extern/rleplus'

* Add rleplus

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Update sectorbuilder

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Update sectorbuilder

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Sector Commitment tracker

* jsonrpc: include method name in error log

* node: Basic graceful shutdown

* repo: Close datastore in Close

* storageminer: Better context handling

* cleaning up a few types

* Rought PoST method

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* update go-sectorbuilder

* use new sectorbuilder file interfaces

* fix tests

* Almost working new post code

* jsonrpc: Channel buffeering

* fix websocket closing

* pass those tests

* fix websocket closing again

* Devnet 3; Builtin bootstrap; NAT Port Map

* remove VDFs from tickets

* use faster bls code

* Update filebeat

Change log of rpc buffer as I want to set up alert when it goes to high

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Add more info to storage-miner info command output

* remove empty const block

* Update build scripts

Remove outdated

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Cleanup imports after rename

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Cleanup imports after rename

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* dont hang requests if websockets server shuts down

* REST file import endpoint

* on chain deals: Get things to actually run!

* on chain deals: Expose more chain state in pond

* on chain deals: Deals make it to the chain

* on chain deals: Put dealIDs in CommitSector messages

* WIP: updating to new proofs code

* WIP: updating to new proofs code

* should use the new parameters

* very basic sector seal scheduling

* Fix TestSealAndVerify

* storageminer: Handle uncommited sectors on start

* storageminer: Restart sealing on restart

* storageminer: More correct listing of sealed sectors

* fix panic when close miner

* Update sectorbuilder, v15 params

* WIP Interactive PoRep

* Some more progress on interactive porep

* use reflect select

* move select

* specific ipfs gateway

* use IPFS_GATEWAY

* more refactoring for interactive porep scheduling

* Fix sectorbuilder.VerifySeal

* Move statestore to lib

* Get interactive porep sector sealing mostly working

* Get interactive porep sector sealing mostly working

* Strip unused functionality from sectorstore

* statestore: Use reflect for mutators

* statestore: More generic keys

* Use state store for sectors

* Some smaller fixes

* INTERACTIVE PROEP IS ALIVE

* Update sectorbuilder

* Update sectorbuilder

* Put WorkerThreads on sectorbuilder.Config

* rate-limit some sectorbuilder ops

* Track down all the uses of cboripld and eliminate them

* Update go-sectorbuilder again

* events: Plumb context to callbacks

* fix retrieval protocol error by wrapping stream in peeker

* WIP fixing tests

* Fix statestore.List

* Mostly fix deals

* Improve errors around deal handling

* deals: Set correct Refs

* Create filler deals

* WIP: trying to write a test to reproduce the storage deal error

* Add method to query latest deal state

* fail test if deal errors

* deals: cleanup client state machine

* cborrpc -> cborutil

* Make multiple deals per almost work

* update go-sectorbuilder

* sectorbuilder: use standalone methods

* sectorbuilder: Also test PoSt

* sectorbuilder: Always create directories

* Wip fixing a thing

* sectorbuilder: Use StandaloneWriteWithAlignment

* Storage miner API improvements

* keep track of last used sector id across restarts

* Use the same dir in TestAcquireID

* padreader: Some more testcases

* sectorbuilder: Call destroy in DI module

* Update go-sectorbuilder with gpu fixes

* sectorbuilder: apply some review suggestions

* Test to reproduce post error after restart

* Update sectorbuilder with a fix

* Update sectorbuilder

* WorkerCount on storageminer config

* storageminer: Throttle GeneratePieceCommitment in storeGarbage

* more tracing spans

* fix tests and add some more trace attributes

* Skip slow tests

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Rename to --include-test-params

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* wip

* parallel sectorbuilder test

* sectorbuilder: Call AcquireSectorId in sync

* Skip sectorbuilder tests on slow hardware

* StateAPI: account for slashing in StateMinerPower

* sectorbuilder: open FD later in AddPiece

* sectorbuilder: Drop some unused functions

* wip remote sectorbuilder workers

* remote-worker: wire up storage miner endpoints

* support remote SealPreCommit

* Stats for remote workers

* Working remote PreCommit

* WIP remote sector CommitSseal

* WIP: election post restructuring

* WIP: election post restructuring

* fix rspco serialization

* Swtich to xerrors

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Add lotus-gen, rewire genesis mining

* Add lotus-gen, rewire genesis mining

* More correct challangeCount calc

* WIP getting post in sectorbuilder_test to work

* use the correct sector sizes in places

* sectorbuilder: Measure thigs in TestSealAndVerify

* WIP trying to get election post to compute

* sectorbuilder: Drop stateful sectorbuilder refs

* sync: correct vrfBase for ticket check

* Copy over last sector ID key when migrating sectorbuilder

* replace go-bls-sigs and go-sectorbuilder with filecoin-ffi

- remove old submodules and add new submodule
- update build tooling to consume new unified static library
- update consumers of old libs to use new package

* replace go-bls-sigs and go-sectorbuilder with filecoin-ffi

- remove old submodules and add new submodule
- update build tooling to consume new unified static library
- update consumers of old libs to use new package

* update filecoin-ffi to v18 params

* update filecoin-ffi to v18 params

* More v18 updates

* v19 parameters

* filecoin-ffi master

* filecoin-ffi master

* WIP: uncomment out windowed post code, try to make it work

* actors: Fallback post progress

* storageminer: New fallback post scheduler

* Use ProvingSet for GetSectorsForElectionPost

* Some fixes and dev utils

* seal-worker: Handle cache

* Rework miner test setups to fix tests

* self review: some cleanup

* Fix unsealing, sector based data refs

* deals: Correctly set deal ID in provider states

* actually set unsealed path in sectorbuilder

* Buch of lint fixes

* use challangeCount as sampleRate in IsTicketWinner

* Update filecoin-ffi

* Update filecoin-ffi

* Update filecoin-ffi

* worker: Use system tar for moving cache around

* worker: Use system tar for moving cache around

* worker: Fix rebaining bugs

* paramfetch: Only pull necessary params

* more statticcheck!

* Update filecoin-ffi

* sectorbuilder: update PoRepProofPartitions

* there is no real correlation between challenge count and len(winners)

* Allow no local sectorbuilder workers

* Fix AddPiece with disabled local workers

* Pre-sealing holes

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Update filecoin-ffi

* seed: Trim cache

* Fix tests, circle and make ux nicer
License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* flag blocks that are received too late

* Add lazy RLE+ decoding

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* More iterative algorithms

 - Add RunIterator and decoder from RLE
 - Add BitIterator and BitsFromRuns
 - Add BitsFromSlice
 - Add RunsFromBits

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Improve bitvector performance

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Improve benchmarks and fix bitvector iterator

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Add rle encoder

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Optimize and start wrapping it up

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Remove old bitvector

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Improve complex code and comment it

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>

* Replace rleplus with rlepluslazy

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Fix typo in overflow check

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Some cleanup

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* sectorbuilder: Allow to restrict task types

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* sectorbuilder: Allow to restrict task types

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Update to correct version

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Close files in ExtractTar

* implement sector dir aggregator

* update ffi

* use that nice function i wrote

* this will pretty much always be nil

* support copying directories

* use a package

* Add short tests

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* Move api struct to a seprate pkg

* fix target for ePoSt IsTicketWinner fn

License: MIT
Signed-off-by: default avatarJakub Sztandera <kubuxu@protocol.ai>

* fix sync tests

* Update FFI

* add option to symlink to presealed sectors

* fixup

* sectorbuilder: Fix proving on RO filesystem

* Update filecoin-ffi

* use actual symlink flag

* sectors: Handle sector state reload errors more gracefully

* Use filecoin-ffi master

* Update ffi to f261762

* sectorbuilder: check free space before creating sectors

* sectorbuilder: fs: address review

* fix(sectorbuilder): always cast fsstat.Bsize

fixes compilation issue under macos

* sectorbuilder: fix getpath

* sectorbuilder: Improve not enough space error

* circle: buildall on macos

* Wire up faults in fPoSt

* tear the world asunder

* temporarily move build into lib to prepare for extraction

* consume sectorbuilder from lotus

* port sectorbuilder from lotus

* downgrade to go-datastore 0.1.1 to match lotus
Co-authored-by: default avatarŁukasz Magiera <magik6k@users.noreply.github.com>
Co-authored-by: default avatarWhyrusleeping <why@ipfs.io>
Co-authored-by: default avatarJakub Sztandera <kubuxu@protonmail.ch>
Co-authored-by: default avatarFrank <wholery@163.com>
Co-authored-by: default avatarJack Yao <yaoh.cn@gmail.com>
Co-authored-by: default avatarHenri <3359083+sternhenri@users.noreply.github.com>
Co-authored-by: default avatarCaesar Wang <dtynn@163.com>
Co-authored-by: default avatarFriedel Ziegelmayer <me@dignifiedquire.com>
parent 4c9919a1
package sectorbuilder
// /////
// Proofs
// 1 / n
const SectorChallengeRatioDiv = 25
const MaxFallbackPostChallengeCount = 10
// extracted from lotus/chain/types/blockheader
func ElectionPostChallengeCount(sectors uint64, faults int) uint64 {
if sectors == 0 {
return 0
}
// ceil(sectors / SectorChallengeRatioDiv)
return (sectors-uint64(faults)-1)/SectorChallengeRatioDiv + 1
}
package sectorbuilder
import (
"context"
"golang.org/x/xerrors"
)
type WorkerTaskType int
const (
WorkerPreCommit WorkerTaskType = iota
WorkerCommit
)
type WorkerTask struct {
Type WorkerTaskType
TaskID uint64
SectorID uint64
// preCommit
SealTicket SealTicket
Pieces []PublicPieceInfo
// commit
SealSeed SealSeed
Rspco RawSealPreCommitOutput
}
type workerCall struct {
task WorkerTask
ret chan SealRes
}
func (sb *SectorBuilder) AddWorker(ctx context.Context, cfg WorkerCfg) (<-chan WorkerTask, error) {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
taskCh := make(chan WorkerTask)
r := &remote{
sealTasks: taskCh,
busy: 0,
}
sb.remoteCtr++
sb.remotes[sb.remoteCtr] = r
go sb.remoteWorker(ctx, r, cfg)
return taskCh, nil
}
func (sb *SectorBuilder) returnTask(task workerCall) {
var ret chan workerCall
switch task.task.Type {
case WorkerPreCommit:
ret = sb.precommitTasks
case WorkerCommit:
ret = sb.commitTasks
default:
log.Error("unknown task type", task.task.Type)
}
go func() {
select {
case ret <- task:
case <-sb.stopping:
return
}
}()
}
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote, cfg WorkerCfg) {
defer log.Warn("Remote worker disconnected")
defer func() {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
for i, vr := range sb.remotes {
if vr == r {
delete(sb.remotes, i)
return
}
}
}()
precommits := sb.precommitTasks
if cfg.NoPreCommit {
precommits = nil
}
commits := sb.commitTasks
if cfg.NoCommit {
commits = nil
}
for {
select {
case task := <-commits:
sb.doTask(ctx, r, task)
case task := <-precommits:
sb.doTask(ctx, r, task)
case <-ctx.Done():
return
case <-sb.stopping:
return
}
r.lk.Lock()
r.busy = 0
r.lk.Unlock()
}
}
func (sb *SectorBuilder) doTask(ctx context.Context, r *remote, task workerCall) {
resCh := make(chan SealRes)
sb.remoteLk.Lock()
sb.remoteResults[task.task.TaskID] = resCh
sb.remoteLk.Unlock()
// send the task
select {
case r.sealTasks <- task.task:
case <-ctx.Done():
sb.returnTask(task)
return
}
r.lk.Lock()
r.busy = task.task.TaskID
r.lk.Unlock()
// wait for the result
select {
case res := <-resCh:
// send the result back to the caller
select {
case task.ret <- res:
case <-ctx.Done():
return
case <-sb.stopping:
return
}
case <-ctx.Done():
log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err())
return
case <-sb.stopping:
return
}
}
func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error {
sb.remoteLk.Lock()
rres, ok := sb.remoteResults[task]
if ok {
delete(sb.remoteResults, task)
}
sb.remoteLk.Unlock()
if !ok {
return xerrors.Errorf("task %d not found", task)
}
select {
case rres <- res:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
Subproject commit c42f34b41a306cc7aca7709dd379bd307ab2b3dd
package sectorbuilder
import (
"io/ioutil"
"os"
"path/filepath"
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"golang.org/x/xerrors"
)
type Fault struct {
SectorID uint64
Err error
}
func (sb *SectorBuilder) Scrub(sectorSet sectorbuilder.SortedPublicSectorInfo) []*Fault {
var faults []*Fault
for _, sector := range sectorSet.Values() {
err := sb.checkSector(sector.SectorID)
if err != nil {
faults = append(faults, &Fault{SectorID: sector.SectorID, Err: err})
}
}
return faults
}
func (sb *SectorBuilder) checkSector(sectorID uint64) error {
cache, err := sb.sectorCacheDir(sectorID)
if err != nil {
return xerrors.Errorf("getting sector cache dir: %w", err)
}
if err := assertFile(filepath.Join(cache, "p_aux"), 96, 96); err != nil {
return err
}
if err := assertFile(filepath.Join(cache, "sc-01-data-tree-r-last.dat"), (2*sb.ssize)-32, (2*sb.ssize)-32); err != nil {
return err
}
// TODO: better validate this
if err := assertFile(filepath.Join(cache, "t_aux"), 100, 32000); err != nil { // TODO: what should this actually be?
return err
}
dent, err := ioutil.ReadDir(cache)
if err != nil {
return xerrors.Errorf("reading cache dir %s", cache)
}
if len(dent) != 3 {
return xerrors.Errorf("found %d files in %s, expected 3", len(dent), cache)
}
sealed, err := sb.SealedSectorPath(sectorID)
if err != nil {
return xerrors.Errorf("getting sealed sector path: %w", err)
}
if err := assertFile(filepath.Join(sealed), sb.ssize, sb.ssize); err != nil {
return err
}
return nil
}
func assertFile(path string, minSz uint64, maxSz uint64) error {
st, err := os.Stat(path)
if err != nil {
return xerrors.Errorf("stat %s: %w", path, err)
}
if st.IsDir() {
return xerrors.Errorf("expected %s to be a regular file", path)
}
if uint64(st.Size()) < minSz || uint64(st.Size()) > maxSz {
return xerrors.Errorf("%s wasn't within size bounds, expected %d < f < %d, got %d", minSz, maxSz, st.Size())
}
return nil
}
package sealed_sector_health
// Health represents the healthiness of a sector managed by a
// sector builder.
type Health int
const (
Unknown Health = iota
Ok // everything is fine
InvalidChecksum // sector exists, but checksum is invalid
InvalidLength // sector exists, but length is incorrect
Missing // sector no longer exists
)
var labels = [...]string{
"Unknown",
"Ok",
"InvalidChecksum",
"InvalidLength",
"Missing",
}
func (el Health) String() string {
return labels[el]
}
package sealing_state
// State communicates the state of the sector with respect to sealing.
type State int
const (
Unknown State = iota
AcceptingPieces // sector is still accepting user data
Committed // sector has been committed to a ticket and seed
Committing // sector is being committed
CommittingPaused // sector was committing, but now paused
Failed // sector failed during pre-commit or commit
FullyPacked // sector is no longer accepting pieces; is fully packed
PreCommitted // sector has been pre-committed to a ticket
PreCommitting // sector is pre-committing
PreCommittingPaused // sector was paused during pre-commit
)
var Labels = [...]string{
"Unknown",
"AcceptingPieces",
"Committed",
"Committing",
"CommittingPaused",
"Failed",
"FullyPacked",
"PreCommitted",
"PreCommitting",
"PreCommittingPaused",
}
func (el State) String() string {
return Labels[el]
}
package sectorbuilder
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
dcopy "github.com/otiai10/copy"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
)
const PoStReservedWorkers = 1
const PoRepProofPartitions = 10
var lastSectorIdKey = datastore.NewKey("/last")
var log = logging.Logger("sectorbuilder")
type SortedPublicSectorInfo = sectorbuilder.SortedPublicSectorInfo
type SortedPrivateSectorInfo = sectorbuilder.SortedPrivateSectorInfo
type SealTicket = sectorbuilder.SealTicket
type SealSeed = sectorbuilder.SealSeed
type SealPreCommitOutput = sectorbuilder.SealPreCommitOutput
type SealCommitOutput = sectorbuilder.SealCommitOutput
type PublicPieceInfo = sectorbuilder.PublicPieceInfo
type RawSealPreCommitOutput sectorbuilder.RawSealPreCommitOutput
type EPostCandidate = sectorbuilder.Candidate
const CommLen = sectorbuilder.CommitmentBytesLen
type WorkerCfg struct {
NoPreCommit bool
NoCommit bool
// TODO: 'cost' info, probably in terms of sealing + transfer speed
}
type SectorBuilder struct {
ds datastore.Batching
idLk sync.Mutex
ssize uint64
lastID uint64
Miner address.Address
unsealLk sync.Mutex
noCommit bool
noPreCommit bool
rateLimit chan struct{}
precommitTasks chan workerCall
commitTasks chan workerCall
taskCtr uint64
remoteLk sync.Mutex
remoteCtr int
remotes map[int]*remote
remoteResults map[uint64]chan<- SealRes
addPieceWait int32
preCommitWait int32
commitWait int32
unsealWait int32
fsLk sync.Mutex //nolint: struckcheck
filesystem *fs // TODO: multi-fs support
stopping chan struct{}
}
type JsonRSPCO struct {
CommD []byte
CommR []byte
}
func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO {
return JsonRSPCO{
CommD: rspco.CommD[:],
CommR: rspco.CommR[:],
}
}
func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput {
var out RawSealPreCommitOutput
copy(out.CommD[:], rspco.CommD)
copy(out.CommR[:], rspco.CommR)
return out
}
type SealRes struct {
Err string
GoErr error `json:"-"`
Proof []byte
Rspco JsonRSPCO
}
type remote struct {
lk sync.Mutex
sealTasks chan<- WorkerTask
busy uint64 // only for metrics
}
type Config struct {
SectorSize uint64
Miner address.Address
WorkerThreads uint8
FallbackLastID uint64
NoCommit bool
NoPreCommit bool
Dir string
_ struct{} // guard against nameless init
}
func New(cfg *Config, ds datastore.Batching) (*SectorBuilder, error) {
if cfg.WorkerThreads < PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers, cfg.WorkerThreads)
}
var lastUsedID uint64
b, err := ds.Get(lastSectorIdKey)
switch err {
case nil:
i, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return nil, err
}
lastUsedID = uint64(i)
case datastore.ErrNotFound:
lastUsedID = cfg.FallbackLastID
default:
return nil, err
}
rlimit := cfg.WorkerThreads - PoStReservedWorkers
sealLocal := rlimit > 0
if rlimit == 0 {
rlimit = 1
}
sb := &SectorBuilder{
ds: ds,
ssize: cfg.SectorSize,
lastID: lastUsedID,
filesystem: openFs(cfg.Dir),
Miner: cfg.Miner,
noPreCommit: cfg.NoPreCommit || !sealLocal,
noCommit: cfg.NoCommit || !sealLocal,
rateLimit: make(chan struct{}, rlimit),
taskCtr: 1,
precommitTasks: make(chan workerCall),
commitTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
remotes: map[int]*remote{},
stopping: make(chan struct{}),
}
if err := sb.filesystem.init(); err != nil {
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
}
return sb, nil
}
func NewStandalone(cfg *Config) (*SectorBuilder, error) {
sb := &SectorBuilder{
ds: nil,
ssize: cfg.SectorSize,
Miner: cfg.Miner,
filesystem: openFs(cfg.Dir),
taskCtr: 1,
remotes: map[int]*remote{},
rateLimit: make(chan struct{}, cfg.WorkerThreads),
stopping: make(chan struct{}),
}
if err := sb.filesystem.init(); err != nil {
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
}
return sb, nil
}
func (sb *SectorBuilder) checkRateLimit() {
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting local sectorbuilder call")
}
}
func (sb *SectorBuilder) RateLimit() func() {
sb.checkRateLimit()
sb.rateLimit <- struct{}{}
return func() {
<-sb.rateLimit
}
}
type WorkerStats struct {
LocalFree int
LocalReserved int
LocalTotal int
// todo: post in progress
RemotesTotal int
RemotesFree int
AddPieceWait int
PreCommitWait int
CommitWait int
UnsealWait int
}
func (sb *SectorBuilder) WorkerStats() WorkerStats {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
remoteFree := len(sb.remotes)
for _, r := range sb.remotes {
if r.busy > 0 {
remoteFree--
}
}
return WorkerStats{
LocalFree: cap(sb.rateLimit) - len(sb.rateLimit),
LocalReserved: PoStReservedWorkers,
LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers,
RemotesTotal: len(sb.remotes),
RemotesFree: remoteFree,
AddPieceWait: int(atomic.LoadInt32(&sb.addPieceWait)),
PreCommitWait: int(atomic.LoadInt32(&sb.preCommitWait)),
CommitWait: int(atomic.LoadInt32(&sb.commitWait)),
UnsealWait: int(atomic.LoadInt32(&sb.unsealWait)),
}
}
func addressToProverID(a address.Address) [32]byte {
var proverId [32]byte
copy(proverId[:], a.Payload())
return proverId
}
func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
sb.idLk.Lock()
defer sb.idLk.Unlock()
sb.lastID++
id := sb.lastID
err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id)))
if err != nil {
return 0, err
}
return id, nil
}
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
fs := sb.filesystem
if err := fs.reserve(dataStaging, sb.ssize); err != nil {
return PublicPieceInfo{}, err
}
defer fs.free(dataStaging, sb.ssize)
atomic.AddInt32(&sb.addPieceWait, 1)
ret := sb.RateLimit()
atomic.AddInt32(&sb.addPieceWait, -1)
defer ret()
f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil {
return PublicPieceInfo{}, err
}
stagedFile, err := sb.stagedSectorFile(sectorId)
if err != nil {
return PublicPieceInfo{}, err
}
_, _, commP, err := sectorbuilder.WriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes)
if err != nil {
return PublicPieceInfo{}, err
}
if err := stagedFile.Close(); err != nil {
return PublicPieceInfo{}, err
}
if err := f.Close(); err != nil {
return PublicPieceInfo{}, err
}
return PublicPieceInfo{
Size: pieceSize,
CommP: commP,
}, werr()
}
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
fs := sb.filesystem
if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals
return nil, err
}
defer fs.free(dataUnsealed, sb.ssize)
atomic.AddInt32(&sb.unsealWait, 1)
// TODO: Don't wait if cached
ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker
defer ret()
atomic.AddInt32(&sb.unsealWait, -1)
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Unlock()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return nil, err
}
unsealedPath := sb.unsealedSectorPath(sectorID)
// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
var commd [CommLen]byte
copy(commd[:], commD)
var tkt [CommLen]byte
copy(tkt[:], ticket)
err = sectorbuilder.Unseal(sb.ssize,
PoRepProofPartitions,
cacheDir,
sealedPath,
unsealedPath,
sectorID,
addressToProverID(sb.Miner),
tkt,
commd)
if err != nil {
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
}
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
}
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}
func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) {
atomic.AddInt32(&sb.preCommitWait, -1)
select {
case ret := <-call.ret:
var err error
if ret.Err != "" {
err = xerrors.New(ret.Err)
}
return ret.Rspco.rspco(), err
case <-sb.stopping:
return RawSealPreCommitOutput{}, xerrors.New("sectorbuilder stopped")
}
}
func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
fs := sb.filesystem
if err := fs.reserve(dataCache, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataCache, sb.ssize)
if err := fs.reserve(dataSealed, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataSealed, sb.ssize)
call := workerCall{
task: WorkerTask{
Type: WorkerPreCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.preCommitWait, 1)
select { // prefer remote
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
default:
}
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noPreCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.precommitTasks <- call:
return sb.sealPreCommitRemote(call)
case rl <- struct{}{}:
}
atomic.AddInt32(&sb.preCommitWait, -1)
// local
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err)
}
sealedPath, err := sb.SealedSectorPath(sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err)
}
e, err := os.OpenFile(sealedPath, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
if err := e.Close(); err != nil {
return RawSealPreCommitOutput{}, err
}
var sum uint64
for _, piece := range pieces {
sum += piece.Size
}
ussize := UserBytesForSectorSize(sb.ssize)
if sum != ussize {
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
stagedPath := sb.StagedSectorPath(sectorID)
rspco, err := sectorbuilder.SealPreCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
stagedPath,
sealedPath,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
pieces,
)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err)
}
return RawSealPreCommitOutput(rspco), nil
}
func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
select {
case ret := <-call.ret:
if ret.Err != "" {
err = xerrors.New(ret.Err)
}
return ret.Proof, err
case <-sb.stopping:
return nil, xerrors.New("sectorbuilder stopped")
}
}
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
if err != nil {
return nil, err
}
proof, err = sectorbuilder.SealCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
seed.TicketBytes,
pieces,
sectorbuilder.RawSealPreCommitOutput(rspco),
)
if err != nil {
log.Warn("StandaloneSealCommit error: ", err)
log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco)
return nil, xerrors.Errorf("StandaloneSealCommit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
call := workerCall{
task: WorkerTask{
Type: WorkerCommit,
TaskID: atomic.AddUint64(&sb.taskCtr, 1),
SectorID: sectorID,
SealTicket: ticket,
Pieces: pieces,
SealSeed: seed,
Rspco: rspco,
},
ret: make(chan SealRes),
}
atomic.AddInt32(&sb.commitWait, 1)
select { // prefer remote
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
default:
sb.checkRateLimit()
rl := sb.rateLimit
if sb.noCommit {
rl = make(chan struct{})
}
select { // use whichever is available
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
}
}
if err != nil {
return nil, xerrors.Errorf("commit: %w", err)
}
return proof, nil
}
func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) {
if len(challengeSeed) != CommLen {
return nil, xerrors.Errorf("given challenge seed was the wrong length: %d != %d", len(challengeSeed), CommLen)
}
var cseed [CommLen]byte
copy(cseed[:], challengeSeed)
privsects, err := sb.pubSectorToPriv(sectorInfo, nil) // TODO: faults
if err != nil {
return nil, err
}
proverID := addressToProverID(sb.Miner)
return sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsects, cseed, winners)
}
func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo, faults)
if err != nil {
return nil, err
}
challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo.Values())), len(faults))
proverID := addressToProverID(sb.Miner)
return sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
}
func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faults []uint64) (SortedPrivateSectorInfo, error) {
fmap := map[uint64]struct{}{}
for _, fault := range faults {
fmap[fault] = struct{}{}
}
var out []sectorbuilder.PrivateSectorInfo
for _, s := range sectorInfo.Values() {
if _, faulty := fmap[s.SectorID]; faulty {
continue
}
cachePath, err := sb.sectorCacheDir(s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache path for sector %d: %w", s.SectorID, err)
}
sealedPath, err := sb.SealedSectorPath(s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err)
}
out = append(out, sectorbuilder.PrivateSectorInfo{
SectorID: s.SectorID,
CommR: s.CommR,
CacheDirPath: cachePath,
SealedSectorPath: sealedPath,
})
}
return NewSortedPrivateSectorInfo(out), nil
}
func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) {
privsectors, err := sb.pubSectorToPriv(sectorInfo, faults)
if err != nil {
return nil, nil, err
}
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())), len(faults))
proverID := addressToProverID(sb.Miner)
candidates, err := sectorbuilder.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors)
if err != nil {
return nil, nil, err
}
proof, err := sectorbuilder.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates)
return candidates, proof, err
}
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
}
func fallbackPostChallengeCount(sectors uint64, faults int) uint64 {
challengeCount := ElectionPostChallengeCount(sectors, faults)
if challengeCount > MaxFallbackPostChallengeCount {
return MaxFallbackPostChallengeCount
}
return challengeCount
}
func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder, symlink bool) error {
if err := migrate(osb.filesystem.pathFor(dataCache), sb.filesystem.pathFor(dataCache), symlink); err != nil {
return err
}
if err := migrate(osb.filesystem.pathFor(dataStaging), sb.filesystem.pathFor(dataStaging), symlink); err != nil {
return err
}
if err := migrate(osb.filesystem.pathFor(dataSealed), sb.filesystem.pathFor(dataSealed), symlink); err != nil {
return err
}
val, err := osb.ds.Get(lastSectorIdKey)
if err != nil {
return err
}
if err := sb.ds.Put(lastSectorIdKey, val); err != nil {
return err
}
sb.lastID = osb.lastID
return nil
}
func (sb *SectorBuilder) SetLastSectorID(id uint64) error {
if err := sb.ds.Put(lastSectorIdKey, []byte(fmt.Sprint(id))); err != nil {
return err
}
sb.lastID = id
return nil
}
func migrate(from, to string, symlink bool) error {
st, err := os.Stat(from)
if err != nil {
return err
}
if st.IsDir() {
return migrateDir(from, to, symlink)
}
return migrateFile(from, to, symlink)
}
func migrateDir(from, to string, symlink bool) error {
tost, err := os.Stat(to)
if err != nil {
if !os.IsNotExist(err) {
return err
}
if err := os.MkdirAll(to, 0755); err != nil {
return err
}
} else if !tost.IsDir() {
return xerrors.Errorf("target %q already exists and is a file (expected directory)")
}
dirents, err := ioutil.ReadDir(from)
if err != nil {
return err
}
for _, inf := range dirents {
n := inf.Name()
if inf.IsDir() {
if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
} else {
if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
}
}
return nil
}
func migrateFile(from, to string, symlink bool) error {
if symlink {
return os.Symlink(from, to)
}
return dcopy.Copy(from, to)
}
package sectorbuilder_test
import (
"context"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"runtime"
"sync"
"testing"
"time"
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/paramfetch"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func init() {
logging.SetLogLevel("*", "INFO") //nolint: errcheck
}
const sectorSize = 1024
type seal struct {
sid uint64
pco sectorbuilder.RawSealPreCommitOutput
ppi sectorbuilder.PublicPieceInfo
ticket sectorbuilder.SealTicket
}
func (s *seal) precommit(t *testing.T, sb *sectorbuilder.SectorBuilder, sid uint64, done func()) {
dlen := sectorbuilder.UserBytesForSectorSize(sectorSize)
var err error
r := io.LimitReader(rand.New(rand.NewSource(42+int64(sid))), int64(dlen))
s.ppi, err = sb.AddPiece(dlen, sid, r, []uint64{})
if err != nil {
t.Fatalf("%+v", err)
}
s.ticket = sectorbuilder.SealTicket{
BlockHeight: 5,
TicketBytes: [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2},
}
s.pco, err = sb.SealPreCommit(sid, s.ticket, []sectorbuilder.PublicPieceInfo{s.ppi})
if err != nil {
t.Fatalf("%+v", err)
}
done()
}
func (s *seal) commit(t *testing.T, sb *sectorbuilder.SectorBuilder, done func()) {
seed := sectorbuilder.SealSeed{
BlockHeight: 15,
TicketBytes: [32]byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9},
}
proof, err := sb.SealCommit(s.sid, s.ticket, seed, []sectorbuilder.PublicPieceInfo{s.ppi}, s.pco)
if err != nil {
t.Fatalf("%+v", err)
}
ok, err := sectorbuilder.VerifySeal(sectorSize, s.pco.CommR[:], s.pco.CommD[:], sb.Miner, s.ticket.TicketBytes[:], seed.TicketBytes[:], s.sid, proof)
if err != nil {
t.Fatalf("%+v", err)
}
if !ok {
t.Fatal("proof failed to validate")
}
done()
}
func post(t *testing.T, sb *sectorbuilder.SectorBuilder, seals ...seal) time.Time {
cSeed := [32]byte{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9}
ppi := make([]ffi.PublicSectorInfo, len(seals))
for i, s := range seals {
ppi[i] = ffi.PublicSectorInfo{
SectorID: s.sid,
CommR: s.pco.CommR,
}
}
ssi := sectorbuilder.NewSortedPublicSectorInfo(ppi)
candndates, err := sb.GenerateEPostCandidates(ssi, cSeed, []uint64{})
if err != nil {
t.Fatalf("%+v", err)
}
genCandidates := time.Now()
if len(candndates) != 1 {
t.Fatal("expected 1 candidate")
}
postProof, err := sb.ComputeElectionPoSt(ssi, cSeed[:], candndates)
if err != nil {
t.Fatalf("%+v", err)
}
ok, err := sectorbuilder.VerifyElectionPost(context.TODO(), sb.SectorSize(), ssi, cSeed[:], postProof, candndates, sb.Miner)
if err != nil {
t.Fatalf("%+v", err)
}
if !ok {
t.Fatal("bad post")
}
return genCandidates
}
func TestSealAndVerify(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
t.Skip("this is slow")
}
_ = os.Setenv("RUST_LOG", "info")
if err := paramfetch.GetParams(sectorSize); err != nil {
t.Fatalf("%+v", err)
}
ds := datastore.NewMapDatastore()
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", dir)
return
}
if err := os.RemoveAll(dir); err != nil {
t.Error(err)
}
}
defer cleanup()
si, err := sb.AcquireSectorId()
if err != nil {
t.Fatalf("%+v", err)
}
s := seal{sid: si}
start := time.Now()
s.precommit(t, sb, 1, func() {})
precommit := time.Now()
s.commit(t, sb, func() {})
commit := time.Now()
genCandidiates := post(t, sb, s)
epost := time.Now()
// Restart sectorbuilder, re-run post
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
post(t, sb, s)
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("Commit: %s\n", commit.Sub(precommit).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String())
}
func TestSealPoStNoCommit(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
t.Skip("this is slow")
}
_ = os.Setenv("RUST_LOG", "info")
if err := paramfetch.GetParams(sectorSize); err != nil {
t.Fatalf("%+v", err)
}
ds := datastore.NewMapDatastore()
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", dir)
return
}
if err := os.RemoveAll(dir); err != nil {
t.Error(err)
}
}
defer cleanup()
si, err := sb.AcquireSectorId()
if err != nil {
t.Fatalf("%+v", err)
}
s := seal{sid: si}
start := time.Now()
s.precommit(t, sb, 1, func() {})
precommit := time.Now()
// Restart sectorbuilder, re-run post
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
if err := sb.TrimCache(1); err != nil {
t.Fatal(err)
}
genCandidiates := post(t, sb, s)
epost := time.Now()
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(precommit).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String())
}
func TestSealAndVerify2(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
t.Skip("this is slow")
}
_ = os.Setenv("RUST_LOG", "info")
if err := paramfetch.GetParams(sectorSize); err != nil {
t.Fatalf("%+v", err)
}
ds := datastore.NewMapDatastore()
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if err := os.RemoveAll(dir); err != nil {
t.Error(err)
}
}
defer cleanup()
var wg sync.WaitGroup
si1, err := sb.AcquireSectorId()
if err != nil {
t.Fatalf("%+v", err)
}
si2, err := sb.AcquireSectorId()
if err != nil {
t.Fatalf("%+v", err)
}
s1 := seal{sid: si1}
s2 := seal{sid: si2}
wg.Add(2)
go s1.precommit(t, sb, 1, wg.Done) //nolint: staticcheck
time.Sleep(100 * time.Millisecond)
go s2.precommit(t, sb, 2, wg.Done) //nolint: staticcheck
wg.Wait()
wg.Add(2)
go s1.commit(t, sb, wg.Done) //nolint: staticcheck
go s2.commit(t, sb, wg.Done) //nolint: staticcheck
wg.Wait()
post(t, sb, s1, s2)
}
func TestAcquireID(t *testing.T) {
ds := datastore.NewMapDatastore()
dir, err := ioutil.TempDir("", "sbtest")
if err != nil {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
assertAcquire := func(expect uint64) {
id, err := sb.AcquireSectorId()
require.NoError(t, err)
assert.Equal(t, expect, id)
}
assertAcquire(1)
assertAcquire(2)
assertAcquire(3)
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
assertAcquire(4)
assertAcquire(5)
assertAcquire(6)
if err := os.RemoveAll(dir); err != nil {
t.Error(err)
}
}
package sectorbuilder
import (
"context"
"io"
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"go.opencensus.io/trace"
"github.com/filecoin-project/go-address"
)
func (sb *SectorBuilder) SectorSize() uint64 {
return sb.ssize
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD []byte, proverID address.Address, ticket []byte, seed []byte, sectorID uint64, proof []byte) (bool, error) {
var commRa, commDa, ticketa, seeda [32]byte
copy(commRa[:], commR)
copy(commDa[:], commD)
copy(ticketa[:], ticket)
copy(seeda[:], seed)
proverIDa := addressToProverID(proverID)
return sectorbuilder.VerifySeal(sectorSize, commRa, commDa, proverIDa, ticketa, seeda, sectorID, proof)
}
func NewSortedPrivateSectorInfo(sectors []sectorbuilder.PrivateSectorInfo) SortedPrivateSectorInfo {
return sectorbuilder.NewSortedPrivateSectorInfo(sectors...)
}
func NewSortedPublicSectorInfo(sectors []sectorbuilder.PublicSectorInfo) SortedPublicSectorInfo {
return sectorbuilder.NewSortedPublicSectorInfo(sectors...)
}
func VerifyElectionPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo.Values())), 0)
return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID)
}
func VerifyFallbackPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address, faults int) (bool, error) {
challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())), faults)
return verifyPost(ctx, sectorSize, sectorInfo, challengeCount, challengeSeed, proof, candidates, proverID)
}
func verifyPost(ctx context.Context, sectorSize uint64, sectorInfo SortedPublicSectorInfo, challengeCount uint64, challengeSeed []byte, proof []byte, candidates []EPostCandidate, proverID address.Address) (bool, error) {
var challengeSeeda [CommLen]byte
copy(challengeSeeda[:], challengeSeed)
_, span := trace.StartSpan(ctx, "VerifyPoSt")
defer span.End()
prover := addressToProverID(proverID)
return sectorbuilder.VerifyPoSt(sectorSize, sectorInfo, challengeSeeda, challengeCount, proof, candidates, prover)
}
func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen]byte, err error) {
f, werr, err := toReadableFile(piece, int64(pieceSize))
if err != nil {
return [32]byte{}, err
}
commP, err = sectorbuilder.GeneratePieceCommitmentFromFile(f, pieceSize)
if err != nil {
return [32]byte{}, err
}
return commP, werr()
}
func GenerateDataCommitment(ssize uint64, pieces []sectorbuilder.PublicPieceInfo) ([CommLen]byte, error) {
return sectorbuilder.GenerateDataCommitment(ssize, pieces)
}
package go_sectorbuilder
import (
"unsafe"
"github.com/filecoin-project/go-sectorbuilder/sealed_sector_health"
"github.com/pkg/errors"
)
// #cgo LDFLAGS: ${SRCDIR}/libsector_builder_ffi.a
// #cgo pkg-config: ${SRCDIR}/sector_builder_ffi.pc
// #include "./sector_builder_ffi.h"
import "C"
// SingleProofPartitionProofLen denotes the number of bytes in a proof generated
// with a single partition. The number of bytes in a proof increases linearly
// with the number of partitions used when creating that proof.
const SingleProofPartitionProofLen = 192
func cPublicPieceInfo(src []PublicPieceInfo) (*C.sector_builder_ffi_FFIPublicPieceInfo, C.size_t) {
srcCSizeT := C.size_t(len(src))
// allocate array in C heap
cPublicPieceInfos := C.malloc(srcCSizeT * C.sizeof_sector_builder_ffi_FFIPublicPieceInfo)
// create a Go slice backed by the C-array
xs := (*[1 << 30]C.sector_builder_ffi_FFIPublicPieceInfo)(cPublicPieceInfos)
for i, v := range src {
xs[i] = C.sector_builder_ffi_FFIPublicPieceInfo{
num_bytes: C.uint64_t(v.Size),
comm_p: *(*[32]C.uint8_t)(unsafe.Pointer(&v.CommP)),
}
}
return (*C.sector_builder_ffi_FFIPublicPieceInfo)(cPublicPieceInfos), srcCSizeT
}
func cPieceMetadata(src []PieceMetadata) (*C.sector_builder_ffi_FFIPieceMetadata, C.size_t) {
srcCSizeT := C.size_t(len(src))
// allocate array in C heap
cPieceMetadata := C.malloc(srcCSizeT * C.sizeof_sector_builder_ffi_FFIPieceMetadata)
// create a Go slice backed by the C-array
xs := (*[1 << 30]C.sector_builder_ffi_FFIPieceMetadata)(cPieceMetadata)
for i, v := range src {
xs[i] = C.sector_builder_ffi_FFIPieceMetadata{
piece_key: C.CString(v.Key),
num_bytes: C.uint64_t(v.Size),
comm_p: *(*[32]C.uint8_t)(unsafe.Pointer(&v.CommP)),
}
}
return (*C.sector_builder_ffi_FFIPieceMetadata)(cPieceMetadata), srcCSizeT
}
func cUint64s(src []uint64) (*C.uint64_t, C.size_t) {
srcCSizeT := C.size_t(len(src))
// allocate array in C heap
cUint64s := C.malloc(srcCSizeT * C.sizeof_uint64_t)
// create a Go slice backed by the C-array
pp := (*[1 << 30]C.uint64_t)(cUint64s)
for i, v := range src {
pp[i] = C.uint64_t(v)
}
return (*C.uint64_t)(cUint64s), srcCSizeT
}
func cSectorClass(sectorSize uint64, poRepProofPartitions uint8) C.sector_builder_ffi_FFISectorClass {
return C.sector_builder_ffi_FFISectorClass{
sector_size: C.uint64_t(sectorSize),
porep_proof_partitions: C.uint8_t(poRepProofPartitions),
}
}
func cSealPreCommitOutput(src RawSealPreCommitOutput) C.sector_builder_ffi_FFISealPreCommitOutput {
return C.sector_builder_ffi_FFISealPreCommitOutput{
comm_d: *(*[32]C.uint8_t)(unsafe.Pointer(&src.CommD)),
comm_r: *(*[32]C.uint8_t)(unsafe.Pointer(&src.CommR)),
p_aux_comm_c: *(*[32]C.uint8_t)(unsafe.Pointer(&src.CommC)),
p_aux_comm_r_last: *(*[32]C.uint8_t)(unsafe.Pointer(&src.CommRLast)),
}
}
func cCandidates(src []Candidate) (*C.sector_builder_ffi_FFICandidate, C.size_t) {
srcCSizeT := C.size_t(len(src))
// allocate array in C heap
cCandidates := C.malloc(srcCSizeT * C.sizeof_sector_builder_ffi_FFICandidate)
// create a Go slice backed by the C-array
pp := (*[1 << 30]C.sector_builder_ffi_FFICandidate)(cCandidates)
for i, v := range src {
pp[i] = C.sector_builder_ffi_FFICandidate{
sector_id: C.uint64_t(v.SectorID),
partial_ticket: *(*[32]C.uint8_t)(unsafe.Pointer(&v.PartialTicket)),
ticket: *(*[32]C.uint8_t)(unsafe.Pointer(&v.Ticket)),
sector_challenge_index: C.uint64_t(v.SectorChallengeIndex),
}
}
return (*C.sector_builder_ffi_FFICandidate)(cCandidates), srcCSizeT
}
func cPrivateReplicaInfos(src []PrivateSectorInfo) (*C.sector_builder_ffi_FFIPrivateReplicaInfo, C.size_t) {
srcCSizeT := C.size_t(len(src))
cPrivateReplicas := C.malloc(srcCSizeT * C.sizeof_sector_builder_ffi_FFIPrivateReplicaInfo)
pp := (*[1 << 30]C.sector_builder_ffi_FFIPrivateReplicaInfo)(cPrivateReplicas)
for i, v := range src {
pp[i] = C.sector_builder_ffi_FFIPrivateReplicaInfo{
cache_dir_path: C.CString(v.CacheDirPath),
comm_r: *(*[32]C.uint8_t)(unsafe.Pointer(&v.CommR)),
replica_path: C.CString(v.SealedSectorPath),
sector_id: C.uint64_t(v.SectorID),
}
}
return (*C.sector_builder_ffi_FFIPrivateReplicaInfo)(cPrivateReplicas), srcCSizeT
}
func goBytes(src *C.uint8_t, size C.size_t) []byte {
return C.GoBytes(unsafe.Pointer(src), C.int(size))
}
func goSealTicket(src C.sector_builder_ffi_FFISealTicket) SealTicket {
return SealTicket{
TicketBytes: goCommitment(&src.ticket_bytes[0]),
BlockHeight: uint64(src.block_height),
}
}
func goCandidates(src *C.sector_builder_ffi_FFICandidate, size C.size_t) ([]Candidate, error) {
candidates := make([]Candidate, size)
if src == nil || size == 0 {
return candidates, nil
}
ptrs := (*[1 << 30]C.sector_builder_ffi_FFICandidate)(unsafe.Pointer(src))[:size:size]
for i := 0; i < int(size); i++ {
candidates[i] = goCandidate(ptrs[i])
}
return candidates, nil
}
func goCandidate(src C.sector_builder_ffi_FFICandidate) Candidate {
return Candidate{
SectorID: uint64(src.sector_id),
PartialTicket: goCommitment(&src.partial_ticket[0]),
Ticket: goCommitment(&src.ticket[0]),
SectorChallengeIndex: uint64(src.sector_challenge_index),
}
}
func goRawSealPreCommitOutput(src C.sector_builder_ffi_FFISealPreCommitOutput) RawSealPreCommitOutput {
return RawSealPreCommitOutput{
CommD: goCommitment(&src.comm_d[0]),
CommR: goCommitment(&src.comm_r[0]),
CommRLast: goCommitment(&src.p_aux_comm_r_last[0]),
CommC: goCommitment(&src.p_aux_comm_c[0]),
}
}
func goCommitment(src *C.uint8_t) [32]byte {
slice := C.GoBytes(unsafe.Pointer(src), 32)
var array [CommitmentBytesLen]byte
copy(array[:], slice)
return array
}
func goSectorBuilderSealCommitOutput(src *C.sector_builder_ffi_SectorBuilderSealCommitResponse) (SealCommitOutput, error) {
commDSlice := goBytes(&src.comm_d[0], CommitmentBytesLen)
var commD [CommitmentBytesLen]byte
copy(commD[:], commDSlice)
commRSlice := goBytes(&src.comm_r[0], CommitmentBytesLen)
var commR [CommitmentBytesLen]byte
copy(commR[:], commRSlice)
proof := goBytes(src.proofs_ptr, src.proofs_len)
pieces, err := goPieceMetadata(src.pieces_ptr, src.pieces_len)
if err != nil {
return SealCommitOutput{}, errors.Wrap(err, "failed to marshal piece metadata")
}
return SealCommitOutput{
SectorID: uint64(src.sector_id),
CommD: commD,
CommR: commR,
Proof: proof,
Pieces: pieces,
Ticket: goSealTicket(src.seal_ticket),
Seed: goSealSeed(src.seal_seed),
}, nil
}
func goResumeSealCommitOutput(src *C.sector_builder_ffi_ResumeSealCommitResponse) (SealCommitOutput, error) {
commDSlice := goBytes(&src.comm_d[0], CommitmentBytesLen)
var commD [CommitmentBytesLen]byte
copy(commD[:], commDSlice)
commRSlice := goBytes(&src.comm_r[0], CommitmentBytesLen)
var commR [CommitmentBytesLen]byte
copy(commR[:], commRSlice)
proof := goBytes(src.proofs_ptr, src.proofs_len)
pieces, err := goPieceMetadata(src.pieces_ptr, src.pieces_len)
if err != nil {
return SealCommitOutput{}, errors.Wrap(err, "failed to marshal piece metadata")
}
return SealCommitOutput{
SectorID: uint64(src.sector_id),
CommD: commD,
CommR: commR,
Proof: proof,
Pieces: pieces,
Ticket: goSealTicket(src.seal_ticket),
Seed: goSealSeed(src.seal_seed),
}, nil
}
func goSectorBuilderSealPreCommitOutput(src *C.sector_builder_ffi_SectorBuilderSealPreCommitResponse) (SealPreCommitOutput, error) {
commDSlice := goBytes(&src.comm_d[0], CommitmentBytesLen)
var commD [CommitmentBytesLen]byte
copy(commD[:], commDSlice)
commRSlice := goBytes(&src.comm_r[0], CommitmentBytesLen)
var commR [CommitmentBytesLen]byte
copy(commR[:], commRSlice)
pieces, err := goPieceMetadata(src.pieces_ptr, src.pieces_len)
if err != nil {
return SealPreCommitOutput{}, errors.Wrap(err, "failed to marshal piece metadata")
}
return SealPreCommitOutput{
SectorID: uint64(src.sector_id),
CommD: commD,
CommR: commR,
Pieces: pieces,
Ticket: goSealTicket(src.seal_ticket),
}, nil
}
func goResumeSealPreCommitOutput(src *C.sector_builder_ffi_ResumeSealPreCommitResponse) (SealPreCommitOutput, error) {
commDSlice := goBytes(&src.comm_d[0], CommitmentBytesLen)
var commD [CommitmentBytesLen]byte
copy(commD[:], commDSlice)
commRSlice := goBytes(&src.comm_r[0], CommitmentBytesLen)
var commR [CommitmentBytesLen]byte
copy(commR[:], commRSlice)
pieces, err := goPieceMetadata(src.pieces_ptr, src.pieces_len)
if err != nil {
return SealPreCommitOutput{}, errors.Wrap(err, "failed to marshal piece metadata")
}
return SealPreCommitOutput{
SectorID: uint64(src.sector_id),
CommD: commD,
CommR: commR,
Pieces: pieces,
Ticket: goSealTicket(src.seal_ticket),
}, nil
}
func goSealSeed(src C.sector_builder_ffi_FFISealTicket) SealSeed {
seedBytesSlice := C.GoBytes(unsafe.Pointer(&src.ticket_bytes[0]), 32)
var seedBytes [CommitmentBytesLen]byte
copy(seedBytes[:], seedBytesSlice)
return SealSeed{
TicketBytes: seedBytes,
BlockHeight: uint64(src.block_height),
}
}
func goStagedSectorMetadata(src *C.sector_builder_ffi_FFIStagedSectorMetadata, size C.size_t) ([]StagedSectorMetadata, error) {
sectors := make([]StagedSectorMetadata, size)
if src == nil || size == 0 {
return sectors, nil
}
sectorPtrs := (*[1 << 30]C.sector_builder_ffi_FFIStagedSectorMetadata)(unsafe.Pointer(src))[:size:size]
for i := 0; i < int(size); i++ {
sectors[i] = StagedSectorMetadata{
SectorID: uint64(sectorPtrs[i].sector_id),
}
}
return sectors, nil
}
func goSealedSectorMetadata(src *C.sector_builder_ffi_FFISealedSectorMetadata, size C.size_t) ([]SealedSectorMetadata, error) {
sectors := make([]SealedSectorMetadata, size)
if src == nil || size == 0 {
return sectors, nil
}
ptrs := (*[1 << 30]C.sector_builder_ffi_FFISealedSectorMetadata)(unsafe.Pointer(src))[:size:size]
for i := 0; i < int(size); i++ {
commDSlice := goBytes(&ptrs[i].comm_d[0], CommitmentBytesLen)
var commD [CommitmentBytesLen]byte
copy(commD[:], commDSlice)
commRSlice := goBytes(&ptrs[i].comm_r[0], CommitmentBytesLen)
var commR [CommitmentBytesLen]byte
copy(commR[:], commRSlice)
proof := goBytes(ptrs[i].proofs_ptr, ptrs[i].proofs_len)
pieces, err := goPieceMetadata(ptrs[i].pieces_ptr, ptrs[i].pieces_len)
if err != nil {
return []SealedSectorMetadata{}, errors.Wrap(err, "failed to marshal piece metadata")
}
health, err := goSealedSectorHealth(ptrs[i].health)
if err != nil {
return []SealedSectorMetadata{}, errors.Wrap(err, "failed to marshal sealed sector health")
}
sectors[i] = SealedSectorMetadata{
SectorID: uint64(ptrs[i].sector_id),
CommD: commD,
CommR: commR,
Proof: proof,
Pieces: pieces,
Health: health,
Ticket: goSealTicket(ptrs[i].seal_ticket),
Seed: goSealSeed(ptrs[i].seal_seed),
}
}
return sectors, nil
}
func goPieceMetadata(src *C.sector_builder_ffi_FFIPieceMetadata, size C.size_t) ([]PieceMetadata, error) {
ps := make([]PieceMetadata, size)
if src == nil || size == 0 {
return ps, nil
}
ptrs := (*[1 << 30]C.sector_builder_ffi_FFIPieceMetadata)(unsafe.Pointer(src))[:size:size]
for i := 0; i < int(size); i++ {
commPSlice := goBytes(&ptrs[i].comm_p[0], CommitmentBytesLen)
var commP [CommitmentBytesLen]byte
copy(commP[:], commPSlice)
ps[i] = PieceMetadata{
Key: C.GoString(ptrs[i].piece_key),
Size: uint64(ptrs[i].num_bytes),
CommP: commP,
}
}
return ps, nil
}
func goSealedSectorHealth(health C.sector_builder_ffi_FFISealedSectorHealth) (sealed_sector_health.Health, error) {
switch health {
case C.Unknown:
return sealed_sector_health.Unknown, nil
case C.Ok:
return sealed_sector_health.Ok, nil
case C.ErrorInvalidChecksum:
return sealed_sector_health.InvalidChecksum, nil
case C.ErrorInvalidLength:
return sealed_sector_health.InvalidLength, nil
case C.ErrorMissing:
return sealed_sector_health.Missing, nil
default:
return sealed_sector_health.Unknown, errors.Errorf("unhandled sealed sector health: %v", health)
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment