//+build cgo package sectorbuilder import ( "context" "io" "os" "sync/atomic" ffi "github.com/filecoin-project/filecoin-ffi" "golang.org/x/xerrors" ) var _ Interface = &SectorBuilder{} func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) { fs := sb.filesystem if err := fs.reserve(dataStaging, sb.ssize); err != nil { return PublicPieceInfo{}, err } defer fs.free(dataStaging, sb.ssize) atomic.AddInt32(&sb.addPieceWait, 1) ret := sb.RateLimit() atomic.AddInt32(&sb.addPieceWait, -1) defer ret() f, werr, err := toReadableFile(file, int64(pieceSize)) if err != nil { return PublicPieceInfo{}, err } stagedFile, err := sb.stagedSectorFile(sectorId) if err != nil { return PublicPieceInfo{}, err } _, _, commP, err := ffi.WriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes) if err != nil { return PublicPieceInfo{}, err } if err := stagedFile.Close(); err != nil { return PublicPieceInfo{}, err } if err := f.Close(); err != nil { return PublicPieceInfo{}, err } return PublicPieceInfo{ Size: pieceSize, CommP: commP, }, werr() } func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { fs := sb.filesystem if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals return nil, err } defer fs.free(dataUnsealed, sb.ssize) atomic.AddInt32(&sb.unsealWait, 1) // TODO: Don't wait if cached ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker defer ret() atomic.AddInt32(&sb.unsealWait, -1) sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel defer sb.unsealLk.Unlock() cacheDir, err := sb.sectorCacheDir(sectorID) if err != nil { return nil, err } sealedPath, err := sb.SealedSectorPath(sectorID) if err != nil { return nil, err } unsealedPath := sb.unsealedSectorPath(sectorID) // TODO: GC for those // (Probably configurable count of sectors to be kept unsealed, and just // remove last used one (or use whatever other cache policy makes sense)) f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644) if err != nil { if !os.IsNotExist(err) { return nil, err } var commd [CommLen]byte copy(commd[:], commD) var tkt [CommLen]byte copy(tkt[:], ticket) err = ffi.Unseal(sb.ssize, PoRepProofPartitions, cacheDir, sealedPath, unsealedPath, sectorID, addressToProverID(sb.Miner), tkt, commd) if err != nil { return nil, xerrors.Errorf("unseal failed: %w", err) } f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644) if err != nil { return nil, err } } if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { return nil, xerrors.Errorf("seek: %w", err) } lr := io.LimitReader(f, int64(size)) return &struct { io.Reader io.Closer }{ Reader: lr, Closer: f, }, nil } func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) { fs := sb.filesystem if err := fs.reserve(dataCache, sb.ssize); err != nil { return RawSealPreCommitOutput{}, err } defer fs.free(dataCache, sb.ssize) if err := fs.reserve(dataSealed, sb.ssize); err != nil { return RawSealPreCommitOutput{}, err } defer fs.free(dataSealed, sb.ssize) call := workerCall{ task: WorkerTask{ Type: WorkerPreCommit, TaskID: atomic.AddUint64(&sb.taskCtr, 1), SectorID: sectorID, SealTicket: ticket, Pieces: pieces, }, ret: make(chan SealRes), } atomic.AddInt32(&sb.preCommitWait, 1) select { // prefer remote case sb.precommitTasks <- call: return sb.sealPreCommitRemote(call) default: } sb.checkRateLimit() rl := sb.rateLimit if sb.noPreCommit { rl = make(chan struct{}) } select { // use whichever is available case sb.precommitTasks <- call: return sb.sealPreCommitRemote(call) case rl <- struct{}{}: case <-ctx.Done(): return RawSealPreCommitOutput{}, ctx.Err() } atomic.AddInt32(&sb.preCommitWait, -1) // local defer func() { <-sb.rateLimit }() cacheDir, err := sb.sectorCacheDir(sectorID) if err != nil { return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err) } sealedPath, err := sb.SealedSectorPath(sectorID) if err != nil { return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err) } e, err := os.OpenFile(sealedPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return RawSealPreCommitOutput{}, xerrors.Errorf("ensuring sealed file exists: %w", err) } if err := e.Close(); err != nil { return RawSealPreCommitOutput{}, err } var sum uint64 for _, piece := range pieces { sum += piece.Size } ussize := UserBytesForSectorSize(sb.ssize) if sum != ussize { return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum)) } stagedPath := sb.StagedSectorPath(sectorID) // TODO: context cancellation respect rspco, err := ffi.SealPreCommit( sb.ssize, PoRepProofPartitions, cacheDir, stagedPath, sealedPath, sectorID, addressToProverID(sb.Miner), ticket.TicketBytes, pieces, ) if err != nil { return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err) } return RawSealPreCommitOutput(rspco), nil } func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { atomic.AddInt32(&sb.commitWait, -1) defer func() { <-sb.rateLimit }() cacheDir, err := sb.sectorCacheDir(sectorID) if err != nil { return nil, err } proof, err = ffi.SealCommit( sb.ssize, PoRepProofPartitions, cacheDir, sectorID, addressToProverID(sb.Miner), ticket.TicketBytes, seed.TicketBytes, pieces, ffi.RawSealPreCommitOutput(rspco), ) if err != nil { log.Warn("StandaloneSealCommit error: ", err) log.Warnf("sid:%d tkt:%v seed:%v, ppi:%v rspco:%v", sectorID, ticket, seed, pieces, rspco) return nil, xerrors.Errorf("StandaloneSealCommit: %w", err) } return proof, nil } func (sb *SectorBuilder) SealCommit(ctx context.Context, sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { call := workerCall{ task: WorkerTask{ Type: WorkerCommit, TaskID: atomic.AddUint64(&sb.taskCtr, 1), SectorID: sectorID, SealTicket: ticket, Pieces: pieces, SealSeed: seed, Rspco: rspco, }, ret: make(chan SealRes), } atomic.AddInt32(&sb.commitWait, 1) select { // prefer remote case sb.commitTasks <- call: proof, err = sb.sealCommitRemote(call) default: sb.checkRateLimit() rl := sb.rateLimit if sb.noCommit { rl = make(chan struct{}) } select { // use whichever is available case sb.commitTasks <- call: proof, err = sb.sealCommitRemote(call) case rl <- struct{}{}: proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco) case <-ctx.Done(): return nil, ctx.Err() } } if err != nil { return nil, xerrors.Errorf("commit: %w", err) } return proof, nil } func (sb *SectorBuilder) ComputeElectionPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed []byte, winners []EPostCandidate) ([]byte, error) { if len(challengeSeed) != CommLen { return nil, xerrors.Errorf("given challenge seed was the wrong length: %d != %d", len(challengeSeed), CommLen) } var cseed [CommLen]byte copy(cseed[:], challengeSeed) privsects, err := sb.pubSectorToPriv(sectorInfo, nil) // TODO: faults if err != nil { return nil, err } proverID := addressToProverID(sb.Miner) return ffi.GeneratePoSt(sb.ssize, proverID, privsects, cseed, winners) } func (sb *SectorBuilder) GenerateEPostCandidates(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, error) { privsectors, err := sb.pubSectorToPriv(sectorInfo, faults) if err != nil { return nil, err } challengeCount := ElectionPostChallengeCount(uint64(len(sectorInfo.Values())), uint64(len(faults))) proverID := addressToProverID(sb.Miner) return ffi.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) } func (sb *SectorBuilder) GenerateFallbackPoSt(sectorInfo SortedPublicSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]EPostCandidate, []byte, error) { privsectors, err := sb.pubSectorToPriv(sectorInfo, faults) if err != nil { return nil, nil, err } challengeCount := fallbackPostChallengeCount(uint64(len(sectorInfo.Values())), uint64(len(faults))) proverID := addressToProverID(sb.Miner) candidates, err := ffi.GenerateCandidates(sb.ssize, proverID, challengeSeed, challengeCount, privsectors) if err != nil { return nil, nil, err } proof, err := ffi.GeneratePoSt(sb.ssize, proverID, privsectors, challengeSeed, candidates) return candidates, proof, err }