Commit 525c38d7 authored by Łukasz Magiera's avatar Łukasz Magiera
Browse files

Filesystem refactor

parent d9cc96c5
package sectorbuilder
import (
"fmt"
"context"
"io"
"io/ioutil"
"os"
......@@ -10,57 +10,37 @@ import (
"sync"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder/fs"
)
func (sb *SectorBuilder) SectorName(sectorID uint64) string {
return fmt.Sprintf("s-%s-%d", sb.Miner, sectorID)
}
func (sb *SectorBuilder) StagedSectorPath(sectorID uint64) string {
return filepath.Join(sb.filesystem.pathFor(dataStaging), sb.SectorName(sectorID))
}
func (sb *SectorBuilder) unsealedSectorPath(sectorID uint64) string {
return filepath.Join(sb.filesystem.pathFor(dataUnsealed), sb.SectorName(sectorID))
}
func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
return os.OpenFile(sb.StagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644)
return fs.SectorName(sb.Miner, sectorID)
}
func (sb *SectorBuilder) SealedSectorPath(sectorID uint64) (string, error) {
path := filepath.Join(sb.filesystem.pathFor(dataSealed), sb.SectorName(sectorID))
return path, nil
func (sb *SectorBuilder) SectorPath(typ fs.DataType, sectorID uint64) (fs.SectorPath, error) {
return sb.filesystem.FindSector(typ, sb.Miner, sectorID)
}
func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
dir := filepath.Join(sb.filesystem.pathFor(dataCache), sb.SectorName(sectorID))
err := os.Mkdir(dir, 0755)
if os.IsExist(err) {
err = nil
}
return dir, err
func (sb *SectorBuilder) AllocSectorPath(typ fs.DataType, sectorID uint64, cache bool) (fs.SectorPath, error) {
return sb.filesystem.AllocSector(typ, sb.Miner, sb.ssize, cache, sectorID)
}
func (sb *SectorBuilder) GetPath(typ string, sectorName string) (string, error) {
_, found := overheadMul[dataType(typ)]
if !found {
return "", xerrors.Errorf("unknown sector type: %s", typ)
}
return filepath.Join(sb.filesystem.pathFor(dataType(typ)), sectorName), nil
func (sb *SectorBuilder) ReleaseSector(typ fs.DataType, path fs.SectorPath) {
sb.filesystem.Release(typ, path, sb.ssize)
}
func (sb *SectorBuilder) TrimCache(sectorID uint64) error {
dir, err := sb.sectorCacheDir(sectorID)
func (sb *SectorBuilder) TrimCache(ctx context.Context, sectorID uint64) error {
dir, err := sb.filesystem.FindSector(fs.DataCache, sb.Miner, sectorID)
if err != nil {
return xerrors.Errorf("getting cache dir: %w", err)
}
if err := sb.filesystem.Lock(ctx, dir); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
defer sb.filesystem.Unlock(dir)
files, err := ioutil.ReadDir(dir)
files, err := ioutil.ReadDir(string(dir))
if err != nil {
return xerrors.Errorf("readdir: %w", err)
}
......@@ -73,7 +53,7 @@ func (sb *SectorBuilder) TrimCache(sectorID uint64) error {
continue
}
if err := os.Remove(filepath.Join(dir, file.Name())); err != nil {
if err := os.Remove(filepath.Join(string(dir), file.Name())); err != nil {
return xerrors.Errorf("rm %s: %w", file.Name(), err)
}
}
......@@ -82,12 +62,12 @@ func (sb *SectorBuilder) TrimCache(sectorID uint64) error {
}
func (sb *SectorBuilder) CanCommit(sectorID uint64) (bool, error) {
dir, err := sb.sectorCacheDir(sectorID)
dir, err := sb.SectorPath(fs.DataCache, sectorID)
if err != nil {
return false, xerrors.Errorf("getting cache dir: %w", err)
}
ents, err := ioutil.ReadDir(dir)
ents, err := ioutil.ReadDir(string(dir))
if err != nil {
return false, err
}
......
package sectorbuilder
import (
"os"
"path/filepath"
"sync"
"syscall"
"golang.org/x/xerrors"
)
type dataType string
const (
dataCache dataType = "cache"
dataStaging dataType = "staging"
dataSealed dataType = "sealed"
dataUnsealed dataType = "unsealed"
)
var overheadMul = map[dataType]uint64{ // * sectorSize
dataCache: 11, // TODO: check if true for 32G sectors
dataStaging: 1,
dataSealed: 1,
dataUnsealed: 1,
}
type fs struct {
path string
// in progress actions
reserved map[dataType]uint64
lk sync.Mutex
}
func openFs(dir string) *fs {
return &fs{
path: dir,
reserved: map[dataType]uint64{},
}
}
func (f *fs) init() error {
for _, dir := range []string{f.path,
f.pathFor(dataCache),
f.pathFor(dataStaging),
f.pathFor(dataSealed),
f.pathFor(dataUnsealed)} {
if err := os.Mkdir(dir, 0755); err != nil {
if os.IsExist(err) {
continue
}
return err
}
}
return nil
}
func (f *fs) pathFor(typ dataType) string {
_, found := overheadMul[typ]
if !found {
panic("unknown data path requested")
}
return filepath.Join(f.path, string(typ))
}
func (f *fs) reservedBytes() int64 {
var out int64
for _, r := range f.reserved {
out += int64(r)
}
return out
}
func (f *fs) reserve(typ dataType, size uint64) error {
f.lk.Lock()
defer f.lk.Unlock()
var fsstat syscall.Statfs_t
if err := syscall.Statfs(f.pathFor(typ), &fsstat); err != nil {
return err
}
fsavail := int64(fsstat.Bavail) * int64(fsstat.Bsize)
avail := fsavail - f.reservedBytes()
need := overheadMul[typ] * size
if int64(need) > avail {
return xerrors.Errorf("not enough space in '%s', need %dB, available %dB (fs: %dB, reserved: %dB)",
f.path,
need,
avail,
fsavail,
f.reservedBytes())
}
f.reserved[typ] += need
return nil
}
func (f *fs) free(typ dataType, sectorSize uint64) {
f.lk.Lock()
defer f.lk.Unlock()
f.reserved[typ] -= overheadMul[typ] * sectorSize
}
package fs
import (
"errors"
"fmt"
"io/ioutil"
"math/big"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"github.com/filecoin-project/go-address"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)
var log = logging.Logger("sectorbuilder")
var ErrNotFound = errors.New("sector not found")
var ErrExists = errors.New("sector already exists")
type DataType string
const (
DataCache DataType = "cache"
DataStaging DataType = "staging"
DataSealed DataType = "sealed"
DataUnsealed DataType = "unsealed"
)
var types = []DataType{DataCache, DataStaging, DataSealed, DataUnsealed}
var overheadMul = map[DataType]uint64{ // * sectorSize
DataCache: 11,
DataStaging: 1,
DataSealed: 1,
DataUnsealed: 1,
}
// StoragePath is a path to storage folder (.lotusstorage)
type StoragePath string
// SectorPath is a path to sector data (.lotusstorage/sealed/s-t0101-42))
type SectorPath string
func (p SectorPath) storage() StoragePath {
return StoragePath(filepath.Dir(filepath.Dir(string(p))))
}
func (p SectorPath) typ() DataType {
return DataType(filepath.Base(filepath.Dir(string(p))))
}
func (p SectorPath) id() (uint64, error) {
b := filepath.Base(string(p))
i := strings.LastIndexByte(b, '-')
if i < 0 {
return 0, xerrors.Errorf("malformed sector file name: '%s', expected to be in form 's-[miner]-[id]'", b)
}
id, err := strconv.ParseUint(b[i+1:], 10, 64)
if err != nil {
return 0, xerrors.Errorf("parsing sector id (name: '%s'): %w", b, err)
}
return id, nil
}
func (p SectorPath) miner() (address.Address, error) {
b := filepath.Base(string(p))
fi := strings.IndexByte(b, '-')
li := strings.LastIndexByte(b, '-')
if li < 0 || fi < 0 {
return address.Undef, xerrors.Errorf("malformed sector file name: '%s', expected to be in form 's-[miner]-[id]'", b)
}
return address.NewFromString(b[fi+1 : li])
}
func SectorName(miner address.Address, sectorID uint64) string {
return fmt.Sprintf("s-%s-%d", miner, sectorID)
}
func (p StoragePath) sector(typ DataType, miner address.Address, id uint64) SectorPath {
return SectorPath(filepath.Join(string(p), string(typ), SectorName(miner, id)))
}
type pathInfo struct {
cache bool // TODO: better name?
weight int
}
type FS struct {
paths map[StoragePath]*pathInfo
// in progress actions
// path -> datatype
reserved map[StoragePath]map[DataType]uint64
locks map[SectorPath]chan struct{}
lk sync.Mutex
}
type PathConfig struct {
Path string
Cache bool
Weight int
}
func OpenFs(cfg []PathConfig) *FS {
paths := map[StoragePath]*pathInfo{}
for _, c := range cfg {
paths[StoragePath(c.Path)] = &pathInfo{
cache: c.Cache,
weight: c.Weight,
}
}
return &FS{
paths: paths,
reserved: map[StoragePath]map[DataType]uint64{},
locks: map[SectorPath]chan struct{}{},
}
}
func (f *FS) Init() error {
for path := range f.paths {
for _, dir := range []string{string(path),
filepath.Join(string(path), string(DataCache)),
filepath.Join(string(path), string(DataStaging)),
filepath.Join(string(path), string(DataSealed)),
filepath.Join(string(path), string(DataUnsealed))} {
if err := os.Mkdir(dir, 0755); err != nil {
if os.IsExist(err) {
continue
}
return err
}
}
}
return nil
}
// TODO: fix the locking situation
func (f *FS) FindSector(typ DataType, miner address.Address, id uint64) (out SectorPath, err error) {
// TODO: consider keeping some sort of index at some point
for path := range f.paths {
p := path.sector(typ, miner, id)
_, err := os.Stat(string(p))
if os.IsNotExist(err) {
continue
}
if err != nil {
log.Errorf("error scanning path %s for sector %s (%s): %+v", p, SectorName(miner, id), string(typ))
continue
}
if out != "" {
if !f.paths[p.storage()].cache {
log.Errorf("%s also found in cache at %s", p, out)
return p, nil
}
}
out = p
}
if out == "" {
return "", ErrNotFound
}
return out, nil
}
func (f *FS) findBestPath(size uint64, cache bool) StoragePath {
var best StoragePath
bestw := big.NewInt(0)
bestc := !cache
for path, info := range f.paths {
if info.cache != cache && bestc != info.cache {
continue
}
avail, _, err := f.availableBytes(path)
if err != nil {
log.Errorf("%+v", err)
continue
}
if uint64(avail) < size {
continue
}
w := big.NewInt(avail)
w.Mul(w, big.NewInt(int64(info.weight)))
if w.Cmp(bestw) > 0 {
best = path
bestw = w
bestc = info.cache
}
}
return best
}
func (f *FS) ForceAllocSector(typ DataType, miner address.Address, ssize uint64, cache bool, id uint64) (SectorPath, error) {
for {
spath, err := f.FindSector(typ, miner, id)
if err == ErrNotFound {
break
}
if err != nil {
return "", xerrors.Errorf("looking for existing sector data: %w", err)
}
log.Warn("found existing sector data in %s, cleaning up", spath)
if err := os.RemoveAll(string(spath)); err != nil {
return "", xerrors.Errorf("cleaning up sector data: %w", err)
}
}
return f.AllocSector(typ, miner, ssize, cache, id)
}
// AllocSector finds the best path for this sector to use
func (f *FS) AllocSector(typ DataType, miner address.Address, ssize uint64, cache bool, id uint64) (SectorPath, error) {
{
spath, err := f.FindSector(typ, miner, id)
if err == nil {
return spath, xerrors.Errorf("allocating sector %s: %m", spath, ErrExists)
}
if err != ErrNotFound {
return "", err
}
}
need := overheadMul[typ] * ssize
p := f.findBestPath(need, cache)
if p == "" {
return "", xerrors.New("no suitable path for sector fond")
}
sp := p.sector(typ, miner, id)
return sp, f.reserve(typ, sp, need)
}
// reserve reserves storage for the sector. `path` is the path of the directory containing sectors
func (f *FS) reserve(typ DataType, path SectorPath, size uint64) error {
f.lk.Lock()
defer f.lk.Unlock()
avail, fsavail, err := f.availableBytes(path.storage())
if err != nil {
return err
}
if int64(size) > avail {
return xerrors.Errorf("not enough space in '%s', need %dB, available %dB (fs: %dB, reserved: %dB)",
f.paths,
size,
avail,
fsavail,
f.reservedBytes(path.storage()))
}
if _, ok := f.reserved[path.storage()]; !ok {
f.reserved[path.storage()] = map[DataType]uint64{}
}
f.reserved[path.storage()][typ] += size
return nil
}
func (f *FS) Release(typ DataType, path SectorPath, sectorSize uint64) {
f.lk.Lock()
defer f.lk.Unlock()
f.reserved[path.storage()][typ] -= overheadMul[typ] * sectorSize
}
func (f *FS) List(path StoragePath, typ DataType) ([]SectorPath, error) {
tp := filepath.Join(string(path), string(typ))
ents, err := ioutil.ReadDir(tp)
if err != nil {
return nil, err
}
out := make([]SectorPath, len(ents))
for i, ent := range ents {
out[i] = SectorPath(filepath.Join(tp, ent.Name()))
}
return out, nil
}
func (f *FS) reservedBytes(path StoragePath) int64 {
var out int64
rsvs, ok := f.reserved[path]
if !ok {
return 0
}
for _, r := range rsvs {
out += int64(r)
}
return out
}
func (f *FS) availableBytes(path StoragePath) (int64, int64, error) {
var fsstat syscall.Statfs_t
if err := syscall.Statfs(string(path), &fsstat); err != nil {
return 0, 0, err
}
fsavail := int64(fsstat.Bavail) * int64(fsstat.Bsize)
avail := fsavail - f.reservedBytes(path)
return avail, fsavail, nil
}
package fs
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestPathUtils(t *testing.T) {
sp := SectorPath("/aoe/aaa-oeu/cache/s-t0999-84")
i, err := sp.id()
require.NoError(t, err)
require.EqualValues(t, 84, i)
a, err := sp.miner()
require.NoError(t, err)
require.Equal(t, "t0999", a.String())
require.Equal(t, DataCache, sp.typ())
require.Equal(t, "/aoe/aaa-oeu", string(sp.storage()))
}
package fs
import "context"
// This very explicitly doesn't use fs locks - we generally only
// care about sector ownership in the storage miner process, and
// using fs locks would make things like seals workers on NFS quite tricky
// TODO: RW eventually
func (f *FS) Lock(ctx context.Context, sector SectorPath) error {
for {
f.lk.Lock()
w, ok := f.locks[sector]
if !ok {
f.locks[sector] = make(chan struct{})
f.lk.Unlock()
return nil
}
f.lk.Unlock()
log.Infof("Waiting for lock on %s", string(sector))
select {
case <-w:
case <-ctx.Done():
return ctx.Err()
}
}
}
func (f *FS) Unlock(sector SectorPath) {
f.lk.Lock()
defer f.lk.Unlock()
close(f.locks[sector])
delete(f.locks, sector)
}
package fs
import (
"io/ioutil"
"os"
"path/filepath"
dcopy "github.com/otiai10/copy"
"golang.org/x/xerrors"
)
func (f *FS) MigrateTo(to *FS, ssize uint64, symlink bool) error {
for path := range f.paths {
for _, dataType := range types {
sectors, err := f.List(path, dataType)
if err != nil {
return err
}
for _, sector := range sectors {
if err := f.migrateSector(to, ssize, sector, symlink); err != nil {
return err
}
}
}
}
return nil
}
func (f *FS) migrateSector(to *FS, ssize uint64, sector SectorPath, symlink bool) error {
id, err := sector.id()
if err != nil {
return err
}
m, err := sector.miner()
if err != nil {
return err
}
newp, err := to.AllocSector(sector.typ(), m, ssize, false, id)
if err != nil {
return err
}
inf, err := os.Stat(string(sector))
if err != nil {
return err
}
if inf.IsDir() {
return migrateDir(string(sector), string(newp), symlink)
}
return migrateFile(string(sector), string(newp), symlink)
}
func migrateDir(from, to string, symlink bool) error {
tost, err := os.Stat(to)
if err != nil {
if !os.IsNotExist(err) {
return err
}
if err := os.MkdirAll(to, 0755); err != nil {
return err
}
} else if !tost.IsDir() {
return xerrors.Errorf("target %q already exists and is a file (expected directory)")
}
dirents, err := ioutil.ReadDir(from)
if err != nil {
return err
}
for _, inf := range dirents {
n := inf.Name()
if inf.IsDir() {
if err := migrateDir(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
} else {
if err := migrateFile(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
}
}
return nil
}
func migrateFile(from, to string, symlink bool) error {
if symlink {
return os.Symlink(from, to)
}
return dcopy.Copy(from, to)
}
......@@ -13,6 +13,7 @@ require (
github.com/ipfs/go-datastore v0.1.1
github.com/ipfs/go-ipld-format v0.0.2 // indirect
github.com/ipfs/go-log v1.0.0
github.com/ipfs/go-log/v2 v2.0.2
github.com/jbenet/goprocess v0.1.3 // indirect
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/go-isatty v0.0.9 // indirect
......
......@@ -5,11 +5,13 @@ import (
"io"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder/fs"
)
type Interface interface {
RateLimit() func()
AddPiece(uint64, uint64, io.Reader, []uint64) (PublicPieceInfo, error)
AddPiece(context.Context, uint64, uint64, io.Reader, []uint64) (PublicPieceInfo, error)
SectorSize() uint64
AcquireSectorId() (uint64, error)
Scrub(SortedPublicSectorInfo) []*Fault
......@@ -21,9 +23,11 @@ type Interface interface {
SealPreCommit(context.Context, uint64, SealTicket, []PublicPieceInfo) (RawSealPreCommitOutput, error)
SealCommit(context.Context, uint64, SealTicket, SealSeed, []PublicPieceInfo, RawSealPreCommitOutput) ([]byte, error)
ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error)
ReadPieceFromSealedSector(ctx context.Context, sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error)
GetPath(string, string) (string, error)
SectorPath(typ fs.DataType, sectorID uint64) (fs.SectorPath, error)
AllocSectorPath(typ fs.DataType, sectorID uint64, cache bool) (fs.SectorPath, error)
ReleaseSector(fs.DataType, fs.SectorPath)
CanCommit(sectorID uint64) (bool, error)
WorkerStats() WorkerStats
AddWorker(context.Context, WorkerCfg) (<-chan WorkerTask, error)
......
......@@ -2,6 +2,7 @@ package sectorbuilder
import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/ipfs/go-datastore"
)
......@@ -14,7 +15,11 @@ func TempSectorbuilderDir(dir string, sectorSize uint64, ds datastore.Batching)
sb, err := New(&Config{
SectorSize: sectorSize,
Dir: dir,
Paths: []fs.PathConfig{{
Path: dir,
Cache: false,
Weight: 1,
}},
WorkerThreads: 2,
Miner: addr,
......
......@@ -7,6 +7,8 @@ import (
sectorbuilder "github.com/filecoin-project/filecoin-ffi"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder/fs"
)
type Fault struct {
......@@ -29,10 +31,11 @@ func (sb *SectorBuilder) Scrub(sectorSet sectorbuilder.SortedPublicSectorInfo) [
}
func (sb *SectorBuilder) checkSector(sectorID uint64) error {
cache, err := sb.sectorCacheDir(sectorID)
scache, err := sb.SectorPath(fs.DataCache, sectorID)
if err != nil {
return xerrors.Errorf("getting sector cache dir: %w", err)
}
cache := string(scache)
if err := assertFile(filepath.Join(cache, "p_aux"), 96, 96); err != nil {
return err
......@@ -54,12 +57,12 @@ func (sb *SectorBuilder) checkSector(sectorID uint64) error {
return xerrors.Errorf("found %d files in %s, expected 3", len(dent), cache)
}
sealed, err := sb.SealedSectorPath(sectorID)
sealed, err := sb.SectorPath(fs.DataSealed, sectorID)
if err != nil {
return xerrors.Errorf("getting sealed sector path: %w", err)
return xerrors.Errorf("getting sealed sector paths: %w", err)
}
if err := assertFile(filepath.Join(sealed), sb.ssize, sb.ssize); err != nil {
if err := assertFile(filepath.Join(string(sealed)), sb.ssize, sb.ssize); err != nil {
return err
}
......
......@@ -5,24 +5,18 @@ package sectorbuilder
import (
"context"
"io"
"io/ioutil"
"os"
"sync/atomic"
ffi "github.com/filecoin-project/filecoin-ffi"
"golang.org/x/xerrors"
fs "github.com/filecoin-project/go-sectorbuilder/fs"
)
var _ Interface = &SectorBuilder{}
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
fs := sb.filesystem
if err := fs.reserve(dataStaging, sb.ssize); err != nil {
return PublicPieceInfo{}, err
}
defer fs.free(dataStaging, sb.ssize)
func (sb *SectorBuilder) AddPiece(ctx context.Context, pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
atomic.AddInt32(&sb.addPieceWait, 1)
ret := sb.RateLimit()
atomic.AddInt32(&sb.addPieceWait, -1)
......@@ -33,10 +27,36 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
return PublicPieceInfo{}, err
}
stagedFile, err := sb.stagedSectorFile(sectorId)
var stagedFile *os.File
var stagedPath fs.SectorPath
if len(existingPieceSizes) == 0 {
stagedPath, err = sb.AllocSectorPath(fs.DataStaging, sectorId, true)
if err != nil {
return PublicPieceInfo{}, xerrors.Errorf("allocating sector: %w", err)
}
stagedFile, err = os.Create(string(stagedPath))
if err != nil {
return PublicPieceInfo{}, xerrors.Errorf("opening sector file: %w", err)
}
defer sb.filesystem.Release(fs.DataStaging, stagedPath, sb.ssize)
} else {
stagedPath, err = sb.SectorPath(fs.DataStaging, sectorId)
if err != nil {
return PublicPieceInfo{}, xerrors.Errorf("getting sector path: %w", err)
}
stagedFile, err = os.OpenFile(string(stagedPath), os.O_RDWR, 0644)
if err != nil {
return PublicPieceInfo{}, xerrors.Errorf("opening sector file: %w", err)
}
}
if err := sb.filesystem.Lock(ctx, stagedPath); err != nil {
return PublicPieceInfo{}, err
}
defer sb.filesystem.Unlock(stagedPath)
_, _, commP, err := ffi.WriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes)
if err != nil {
......@@ -57,13 +77,22 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
}, werr()
}
func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
fs := sb.filesystem
func (sb *SectorBuilder) ReadPieceFromSealedSector(ctx context.Context, sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) {
sfs := sb.filesystem
// TODO: this needs to get smarter when we start supporting partial unseals
unsealedPath, err := sfs.AllocSector(fs.DataUnsealed, sb.Miner, sb.ssize, true, sectorID)
if err != nil {
if !xerrors.Is(err, fs.ErrExists) {
return nil, xerrors.Errorf("AllocSector: %w", err)
}
}
defer sfs.Release(fs.DataUnsealed, unsealedPath, sb.ssize)
if err := fs.reserve(dataUnsealed, sb.ssize); err != nil { // TODO: this needs to get smarter when we start supporting partial unseals
if err := sfs.Lock(ctx, unsealedPath); err != nil {
return nil, err
}
defer fs.free(dataUnsealed, sb.ssize)
defer sfs.Unlock(unsealedPath)
atomic.AddInt32(&sb.unsealWait, 1)
// TODO: Don't wait if cached
......@@ -74,22 +103,20 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel
defer sb.unsealLk.Unlock()
cacheDir, err := sb.sectorCacheDir(sectorID)
cacheDir, err := sb.SectorPath(fs.DataCache, sectorID)
if err != nil {
return nil, err
}
sealedPath, err := sb.SealedSectorPath(sectorID)
sealedPath, err := sb.SectorPath(fs.DataSealed, sectorID)
if err != nil {
return nil, err
}
unsealedPath := sb.unsealedSectorPath(sectorID)
// TODO: GC for those
// (Probably configurable count of sectors to be kept unsealed, and just
// remove last used one (or use whatever other cache policy makes sense))
f, err := os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
f, err := os.OpenFile(string(unsealedPath), os.O_RDONLY, 0644)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
......@@ -103,9 +130,9 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
err = ffi.Unseal(sb.ssize,
PoRepProofPartitions,
cacheDir,
sealedPath,
unsealedPath,
string(cacheDir),
string(sealedPath),
string(unsealedPath),
sectorID,
addressToProverID(sb.Miner),
tkt,
......@@ -114,7 +141,7 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err = os.OpenFile(unsealedPath, os.O_RDONLY, 0644)
f, err = os.OpenFile(string(unsealedPath), os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
......@@ -136,53 +163,32 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6
}
func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, ticket SealTicket, pieces []PublicPieceInfo) (RawSealPreCommitOutput, error) {
fs := sb.filesystem
sfs := sb.filesystem
cacheDir, err := sb.sectorCacheDir(sectorID)
cacheDir, err := sfs.ForceAllocSector(fs.DataCache, sb.Miner, sb.ssize, true, sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting cache dir: %w", err)
}
cached, err := ioutil.ReadDir(cacheDir)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("reading cache dir: %w", err)
}
if len(cached) > 0 {
// TODO: can we read t_aux or p_aux to check if we have the correct thing sealed here already?
// (need to check ticket)
log.Warnf("precommit: cache dir %s contains files %v, cleaning up", cacheDir, cached)
if err := os.RemoveAll(cacheDir); err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("removing cache dir %s: %w", cacheDir, err)
}
if err := os.Mkdir(string(cacheDir), 0755); err != nil {
return RawSealPreCommitOutput{}, err
}
sealedPath, err := sb.SealedSectorPath(sectorID)
sealedPath, err := sfs.ForceAllocSector(fs.DataSealed, sb.Miner, sb.ssize, true, sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector path: %w", err)
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector paths: %w", err)
}
if _, err := os.Stat(sealedPath); !os.IsNotExist(err) {
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("stat cache dir: %w", err)
}
defer sfs.Release(fs.DataCache, cacheDir, sb.ssize)
defer sfs.Release(fs.DataSealed, sealedPath, sb.ssize)
log.Warnf("precommit: found sealed sector %s, cleaning up", sealedPath)
if err := os.Remove(sealedPath); err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("removing sealed sector %s: %w", sealedPath, err)
if err := sfs.Lock(ctx, cacheDir); err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("lock cache: %w", err)
}
if err := sfs.Lock(ctx, sealedPath); err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("lock sealed: %w", err)
}
if err := fs.reserve(dataCache, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataCache, sb.ssize)
if err := fs.reserve(dataSealed, sb.ssize); err != nil {
return RawSealPreCommitOutput{}, err
}
defer fs.free(dataSealed, sb.ssize)
defer sfs.Unlock(cacheDir)
defer sfs.Unlock(sealedPath)
call := workerCall{
task: WorkerTask{
......@@ -226,7 +232,7 @@ func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, tic
<-sb.rateLimit
}()
e, err := os.OpenFile(sealedPath, os.O_RDWR|os.O_CREATE, 0644)
e, err := os.OpenFile(string(sealedPath), os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("ensuring sealed file exists: %w", err)
}
......@@ -243,15 +249,17 @@ func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, tic
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
}
stagedPath := sb.StagedSectorPath(sectorID)
stagedPath, err := sb.SectorPath(fs.DataStaging, sectorID)
if err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("get staged: %w", err)
}
// TODO: context cancellation respect
rspco, err := ffi.SealPreCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
stagedPath,
sealedPath,
string(cacheDir),
string(stagedPath),
string(sealedPath),
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
......@@ -264,22 +272,26 @@ func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, tic
return RawSealPreCommitOutput(rspco), nil
}
func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
func (sb *SectorBuilder) sealCommitLocal(ctx context.Context, sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) {
atomic.AddInt32(&sb.commitWait, -1)
defer func() {
<-sb.rateLimit
}()
cacheDir, err := sb.sectorCacheDir(sectorID)
cacheDir, err := sb.SectorPath(fs.DataCache, sectorID)
if err != nil {
return nil, err
}
if err := sb.filesystem.Lock(ctx, cacheDir); err != nil {
return nil, err
}
defer sb.filesystem.Unlock(cacheDir)
proof, err = ffi.SealCommit(
sb.ssize,
PoRepProofPartitions,
cacheDir,
string(cacheDir),
sectorID,
addressToProverID(sb.Miner),
ticket.TicketBytes,
......@@ -329,7 +341,7 @@ func (sb *SectorBuilder) SealCommit(ctx context.Context, sectorID uint64, ticket
case sb.commitTasks <- call:
proof, err = sb.sealCommitRemote(call)
case rl <- struct{}{}:
proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco)
proof, err = sb.sealCommitLocal(ctx, sectorID, ticket, seed, pieces, rspco)
case <-ctx.Done():
return nil, ctx.Err()
}
......
......@@ -2,9 +2,6 @@ package sectorbuilder
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync/atomic"
......@@ -12,8 +9,9 @@ import (
"github.com/filecoin-project/go-address"
datastore "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
dcopy "github.com/otiai10/copy"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder/fs"
)
const PoStReservedWorkers = 1
......@@ -46,7 +44,7 @@ type Config struct {
NoCommit bool
NoPreCommit bool
Dir string
Paths []fs.PathConfig
_ struct{} // guard against nameless init
}
......@@ -84,7 +82,7 @@ func New(cfg *Config, ds datastore.Batching) (*SectorBuilder, error) {
ssize: cfg.SectorSize,
lastID: lastUsedID,
filesystem: openFs(cfg.Dir),
filesystem: fs.OpenFs(cfg.Paths),
Miner: cfg.Miner,
......@@ -101,7 +99,7 @@ func New(cfg *Config, ds datastore.Batching) (*SectorBuilder, error) {
stopping: make(chan struct{}),
}
if err := sb.filesystem.init(); err != nil {
if err := sb.filesystem.Init(); err != nil {
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
}
......@@ -115,7 +113,7 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) {
ssize: cfg.SectorSize,
Miner: cfg.Miner,
filesystem: openFs(cfg.Dir),
filesystem: fs.OpenFs(cfg.Paths),
taskCtr: 1,
remotes: map[int]*remote{},
......@@ -123,7 +121,7 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) {
stopping: make(chan struct{}),
}
if err := sb.filesystem.init(); err != nil {
if err := sb.filesystem.Init(); err != nil {
return nil, xerrors.Errorf("initializing sectorbuilder filesystem: %w", err)
}
......@@ -246,21 +244,21 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faul
continue
}
cachePath, err := sb.sectorCacheDir(s.SectorID)
cachePath, err := sb.SectorPath(fs.DataCache, s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache path for sector %d: %w", s.SectorID, err)
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache paths for sector %d: %w", s.SectorID, err)
}
sealedPath, err := sb.SealedSectorPath(s.SectorID)
sealedPath, err := sb.SectorPath(fs.DataSealed, s.SectorID)
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed path for sector %d: %w", s.SectorID, err)
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting sealed paths for sector %d: %w", s.SectorID, err)
}
out = append(out, ffi.PrivateSectorInfo{
SectorID: s.SectorID,
CommR: s.CommR,
CacheDirPath: cachePath,
SealedSectorPath: sealedPath,
CacheDirPath: string(cachePath),
SealedSectorPath: string(sealedPath),
})
}
return ffi.NewSortedPrivateSectorInfo(out...), nil
......@@ -275,16 +273,8 @@ func fallbackPostChallengeCount(sectors uint64, faults uint64) uint64 {
}
func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder, symlink bool) error {
if err := migrate(osb.filesystem.pathFor(dataCache), sb.filesystem.pathFor(dataCache), symlink); err != nil {
return err
}
if err := migrate(osb.filesystem.pathFor(dataStaging), sb.filesystem.pathFor(dataStaging), symlink); err != nil {
return err
}
if err := migrate(osb.filesystem.pathFor(dataSealed), sb.filesystem.pathFor(dataSealed), symlink); err != nil {
return err
if err := osb.filesystem.MigrateTo(sb.filesystem, sb.ssize, symlink); err != nil {
return xerrors.Errorf("migrating sector data: %w", err)
}
val, err := osb.ds.Get(lastSectorIdKey)
......@@ -314,61 +304,6 @@ func (sb *SectorBuilder) SetLastSectorID(id uint64) error {
return nil
}
func migrate(from, to string, symlink bool) error {
st, err := os.Stat(from)
if err != nil {
return err
}
if st.IsDir() {
return migrateDir(from, to, symlink)
}
return migrateFile(from, to, symlink)
}
func migrateDir(from, to string, symlink bool) error {
tost, err := os.Stat(to)
if err != nil {
if !os.IsNotExist(err) {
return err
}
if err := os.MkdirAll(to, 0755); err != nil {
return err
}
} else if !tost.IsDir() {
return xerrors.Errorf("target %q already exists and is a file (expected directory)")
}
dirents, err := ioutil.ReadDir(from)
if err != nil {
return err
}
for _, inf := range dirents {
n := inf.Name()
if inf.IsDir() {
if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
} else {
if err := migrate(filepath.Join(from, n), filepath.Join(to, n), symlink); err != nil {
return err
}
}
}
return nil
}
func migrateFile(from, to string, symlink bool) error {
if symlink {
return os.Symlink(from, to)
}
return dcopy.Copy(from, to)
}
func (sb *SectorBuilder) Stop() {
close(sb.stopping)
}
......@@ -43,7 +43,7 @@ func (s *seal) precommit(t *testing.T, sb *sectorbuilder.SectorBuilder, sid uint
var err error
r := io.LimitReader(rand.New(rand.NewSource(42+int64(sid))), int64(dlen))
s.ppi, err = sb.AddPiece(dlen, sid, r, []uint64{})
s.ppi, err = sb.AddPiece(context.TODO(), dlen, sid, r, []uint64{})
if err != nil {
t.Fatalf("%+v", err)
}
......@@ -254,7 +254,7 @@ func TestSealPoStNoCommit(t *testing.T) {
t.Fatalf("%+v", err)
}
if err := sb.TrimCache(1); err != nil {
if err := sb.TrimCache(context.TODO(), 1); err != nil {
t.Fatal(err)
}
......
......@@ -6,6 +6,8 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-datastore"
"github.com/filecoin-project/go-sectorbuilder/fs"
)
type SortedPublicSectorInfo = ffi.SortedPublicSectorInfo
......@@ -64,7 +66,7 @@ type SectorBuilder struct {
unsealWait int32
fsLk sync.Mutex //nolint: struckcheck
filesystem *fs // TODO: multi-fs support
filesystem *fs.FS // TODO: multi-fs support
stopping chan struct{}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment