Commit 327428a8 authored by Łukasz Magiera's avatar Łukasz Magiera
Browse files

FinalizeSector

parent f3d19bcc
......@@ -27,7 +27,7 @@ func (sb *SectorBuilder) AllocSectorPath(typ fs.DataType, sectorID uint64, cache
}
func (sb *SectorBuilder) ReleaseSector(typ fs.DataType, path fs.SectorPath) {
sb.filesystem.Release(typ, path, sb.ssize)
sb.filesystem.Release(path, sb.ssize)
}
func (sb *SectorBuilder) TrimCache(ctx context.Context, sectorID uint64) error {
......
......@@ -3,14 +3,12 @@ package fs
import (
"errors"
"fmt"
"io/ioutil"
"math/big"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"github.com/filecoin-project/go-address"
logging "github.com/ipfs/go-log/v2"
......@@ -21,6 +19,7 @@ var log = logging.Logger("sectorbuilder")
var ErrNotFound = errors.New("sector not found")
var ErrExists = errors.New("sector already exists")
var ErrNoSuitablePath = errors.New("no suitable path for sector fond")
type DataType string
......@@ -83,7 +82,7 @@ func SectorName(miner address.Address, sectorID uint64) string {
return fmt.Sprintf("s-%s-%d", miner, sectorID)
}
func (p StoragePath) sector(typ DataType, miner address.Address, id uint64) SectorPath {
func (p StoragePath) Sector(typ DataType, miner address.Address, id uint64) SectorPath {
return SectorPath(filepath.Join(string(p), string(typ), SectorName(miner, id)))
}
......@@ -152,7 +151,7 @@ func (f *FS) FindSector(typ DataType, miner address.Address, id uint64) (out Sec
// TODO: consider keeping some sort of index at some point
for path := range f.paths {
p := path.sector(typ, miner, id)
p := path.Sector(typ, miner, id)
_, err := os.Stat(string(p))
if os.IsNotExist(err) {
......@@ -178,13 +177,13 @@ func (f *FS) FindSector(typ DataType, miner address.Address, id uint64) (out Sec
return out, nil
}
func (f *FS) findBestPath(size uint64, cache bool) StoragePath {
func (f *FS) findBestPath(size uint64, cache bool, strict bool) StoragePath {
var best StoragePath
bestw := big.NewInt(0)
bestc := !cache
for path, info := range f.paths {
if info.cache != cache && bestc != info.cache {
if info.cache != cache && (bestc != info.cache || strict) {
continue
}
......@@ -243,88 +242,53 @@ func (f *FS) AllocSector(typ DataType, miner address.Address, ssize uint64, cach
need := overheadMul[typ] * ssize
p := f.findBestPath(need, cache)
p := f.findBestPath(need, cache, cache)
if p == "" {
return "", xerrors.New("no suitable path for sector fond")
return "", ErrNoSuitablePath
}
sp := p.sector(typ, miner, id)
sp := p.Sector(typ, miner, id)
return sp, f.reserve(typ, sp, need)
return sp, f.reserve(typ, sp.storage(), need)
}
// reserve reserves storage for the sector. `path` is the path of the directory containing sectors
func (f *FS) reserve(typ DataType, path SectorPath, size uint64) error {
f.lk.Lock()
defer f.lk.Unlock()
avail, fsavail, err := f.availableBytes(path.storage())
if err != nil {
return err
func (f *FS) PrepareCacheMove(sector SectorPath, ssize uint64, tocache bool) (SectorPath, error) {
p := f.findBestPath(ssize, tocache, true)
if p == "" {
return "", ErrNoSuitablePath
}
if int64(size) > avail {
return xerrors.Errorf("not enough space in '%s', need %dB, available %dB (fs: %dB, reserved: %dB)",
f.paths,
size,
avail,
fsavail,
f.reservedBytes(path.storage()))
m, err := sector.miner()
if err != nil {
return "", err
}
if _, ok := f.reserved[path.storage()]; !ok {
f.reserved[path.storage()] = map[DataType]uint64{}
id, err := sector.id()
if err != nil {
return "", err
}
f.reserved[path.storage()][typ] += size
return nil
return p.Sector(sector.typ(), m, id), f.reserve(sector.typ(), p, ssize)
}
func (f *FS) Release(typ DataType, path SectorPath, sectorSize uint64) {
f.lk.Lock()
defer f.lk.Unlock()
f.reserved[path.storage()][typ] -= overheadMul[typ] * sectorSize
}
func (f *FS) List(path StoragePath, typ DataType) ([]SectorPath, error) {
tp := filepath.Join(string(path), string(typ))
ents, err := ioutil.ReadDir(tp)
func (f *FS) MoveSector(from, to SectorPath) error {
inf, err := os.Stat(string(from))
if err != nil {
return nil, err
}
out := make([]SectorPath, len(ents))
for i, ent := range ents {
out[i] = SectorPath(filepath.Join(tp, ent.Name()))
return xerrors.Errorf("stat %s: %w", from, err)
}
return out, nil
}
func (f *FS) reservedBytes(path StoragePath) int64 {
var out int64
rsvs, ok := f.reserved[path]
if !ok {
return 0
if inf.IsDir() {
err = migrateDir(string(from), string(to), false)
} else {
err = migrateFile(string(from), string(to), false)
}
for _, r := range rsvs {
out += int64(r)
if err != nil {
return xerrors.Errorf("migrate sector %s -> %s: %w", from, to, err)
}
return out
}
func (f *FS) availableBytes(path StoragePath) (int64, int64, error) {
var fsstat syscall.Statfs_t
// TODO: run some quick checks
if err := syscall.Statfs(string(path), &fsstat); err != nil {
return 0, 0, err
if err := os.RemoveAll(string(from)); err != nil {
return xerrors.Errorf("cleanup %s: %w", from, err)
}
fsavail := int64(fsstat.Bavail) * int64(fsstat.Bsize)
avail := fsavail - f.reservedBytes(path)
return avail, fsavail, nil
return nil
}
......@@ -94,5 +94,7 @@ func migrateFile(from, to string, symlink bool) error {
return os.Symlink(from, to)
}
log.Info("copy %s -> %s", from, to)
return dcopy.Copy(from, to)
}
......@@ -11,6 +11,7 @@ import (
type Interface interface {
RateLimit() func()
AddPiece(context.Context, uint64, uint64, io.Reader, []uint64) (PublicPieceInfo, error)
SectorSize() uint64
AcquireSectorId() (uint64, error)
......@@ -22,6 +23,9 @@ type Interface interface {
SealPreCommit(context.Context, uint64, SealTicket, []PublicPieceInfo) (RawSealPreCommitOutput, error)
SealCommit(context.Context, uint64, SealTicket, SealSeed, []PublicPieceInfo, RawSealPreCommitOutput) ([]byte, error)
// FinalizeSector cleans up cache, and moves it to storage filesystem
FinalizeSector(context.Context, uint64) error
DropStaged(context.Context, uint64) error
ReadPieceFromSealedSector(ctx context.Context, sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error)
......
......@@ -6,8 +6,8 @@ import (
"github.com/ipfs/go-datastore"
)
func TempSectorbuilderDir(dir string, sectorSize uint64, ds datastore.Batching) (*SectorBuilder, error) {
addr, err := address.NewFromString("t3vfxagwiegrywptkbmyohqqbfzd7xzbryjydmxso4hfhgsnv6apddyihltsbiikjf3lm7x2myiaxhuc77capq")
func TempSectorbuilderDir(paths []fs.PathConfig, sectorSize uint64, ds datastore.Batching) (*SectorBuilder, error) {
addr, err := address.NewFromString("t0123")
if err != nil {
return nil, err
}
......@@ -15,11 +15,7 @@ func TempSectorbuilderDir(dir string, sectorSize uint64, ds datastore.Batching)
sb, err := New(&Config{
SectorSize: sectorSize,
Paths: []fs.PathConfig{{
Path: dir,
Cache: false,
Weight: 1,
}},
Paths: paths,
WorkerThreads: 2,
Miner: addr,
......@@ -30,3 +26,11 @@ func TempSectorbuilderDir(dir string, sectorSize uint64, ds datastore.Batching)
return sb, nil
}
func SimplePath(dir string) []fs.PathConfig {
return []fs.PathConfig{{
Path: dir,
Cache: false,
Weight: 1,
}}
}
......@@ -40,7 +40,7 @@ func (sb *SectorBuilder) AddPiece(ctx context.Context, pieceSize uint64, sectorI
return PublicPieceInfo{}, xerrors.Errorf("opening sector file: %w", err)
}
defer sb.filesystem.Release(fs.DataStaging, stagedPath, sb.ssize)
defer sb.filesystem.Release(stagedPath, sb.ssize)
} else {
stagedPath, err = sb.SectorPath(fs.DataStaging, sectorId)
if err != nil {
......@@ -87,7 +87,7 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(ctx context.Context, sectorID
return nil, xerrors.Errorf("AllocSector: %w", err)
}
}
defer sfs.Release(fs.DataUnsealed, unsealedPath, sb.ssize)
defer sfs.Release(unsealedPath, sb.ssize)
if err := sfs.Lock(ctx, unsealedPath); err != nil {
return nil, err
......@@ -178,8 +178,8 @@ func (sb *SectorBuilder) SealPreCommit(ctx context.Context, sectorID uint64, tic
return RawSealPreCommitOutput{}, xerrors.Errorf("getting sealed sector paths: %w", err)
}
defer sfs.Release(fs.DataCache, cacheDir, sb.ssize)
defer sfs.Release(fs.DataSealed, sealedPath, sb.ssize)
defer sfs.Release(cacheDir, sb.ssize)
defer sfs.Release(sealedPath, sb.ssize)
if err := sfs.Lock(ctx, cacheDir); err != nil {
return RawSealPreCommitOutput{}, xerrors.Errorf("lock cache: %w", err)
......
package sectorbuilder
import (
"context"
"fmt"
"os"
"strconv"
"sync/atomic"
......@@ -244,7 +246,7 @@ func (sb *SectorBuilder) pubSectorToPriv(sectorInfo SortedPublicSectorInfo, faul
continue
}
cachePath, err := sb.SectorPath(fs.DataCache, s.SectorID)
cachePath, err := sb.SectorPath(fs.DataCache, s.SectorID) // TODO: LOCK!
if err != nil {
return SortedPrivateSectorInfo{}, xerrors.Errorf("getting cache paths for sector %d: %w", s.SectorID, err)
}
......@@ -272,6 +274,84 @@ func fallbackPostChallengeCount(sectors uint64, faults uint64) uint64 {
return challengeCount
}
func (sb *SectorBuilder) FinalizeSector(ctx context.Context, id uint64) error {
sealed, err := sb.filesystem.FindSector(fs.DataSealed, sb.Miner, id)
if err != nil {
return xerrors.Errorf("getting sealed sector: %w", err)
}
cache, err := sb.filesystem.FindSector(fs.DataCache, sb.Miner, id)
if err != nil {
return xerrors.Errorf("getting sector cache: %w", err)
}
// todo: flag to just remove
staged, err := sb.filesystem.FindSector(fs.DataStaging, sb.Miner, id)
if err != nil {
return xerrors.Errorf("getting staged sector: %w", err)
}
{
if err := sb.filesystem.Lock(ctx, sealed); err != nil {
return err
}
defer sb.filesystem.Unlock(sealed)
if err := sb.filesystem.Lock(ctx, cache); err != nil {
return err
}
defer sb.filesystem.Unlock(cache)
if err := sb.filesystem.Lock(ctx, staged); err != nil {
return err
}
defer sb.filesystem.Unlock(staged)
}
sealedDest, err := sb.filesystem.PrepareCacheMove(sealed, sb.ssize, false)
if err != nil {
return xerrors.Errorf("prepare move sealed: %w", err)
}
defer sb.filesystem.Release(sealedDest, sb.ssize)
cacheDest, err := sb.filesystem.PrepareCacheMove(cache, sb.ssize, false)
if err != nil {
return xerrors.Errorf("prepare move cache: %w", err)
}
defer sb.filesystem.Release(cacheDest, sb.ssize)
stagedDest, err := sb.filesystem.PrepareCacheMove(staged, sb.ssize, false)
if err != nil {
return xerrors.Errorf("prepare move staged: %w", err)
}
defer sb.filesystem.Release(stagedDest, sb.ssize)
if err := sb.filesystem.MoveSector(sealed, sealedDest); err != nil {
return xerrors.Errorf("move sealed: %w", err)
}
if err := sb.filesystem.MoveSector(cache, cacheDest); err != nil {
return xerrors.Errorf("move cache: %w", err)
}
if err := sb.filesystem.MoveSector(staged, stagedDest); err != nil {
return xerrors.Errorf("move staged: %w", err)
}
return nil
}
func (sb *SectorBuilder) DropStaged(ctx context.Context, id uint64) error {
sp, err := sb.SectorPath(fs.DataStaging, id)
if err != nil {
return xerrors.Errorf("finding staged sector: %w", err)
}
if err := sb.filesystem.Lock(ctx, sp); err != nil {
return err
}
defer sb.filesystem.Unlock(sp)
return os.RemoveAll(string(sp))
}
func (sb *SectorBuilder) ImportFrom(osb *SectorBuilder, symlink bool) error {
if err := osb.filesystem.MigrateTo(sb.filesystem, sb.ssize, symlink); err != nil {
return xerrors.Errorf("migrating sector data: %w", err)
......
......@@ -13,11 +13,12 @@ import (
"time"
ffi "github.com/filecoin-project/filecoin-ffi"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
paramfetch "github.com/filecoin-project/go-paramfetch"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
......@@ -148,21 +149,41 @@ func TestSealAndVerify(t *testing.T) {
ds := datastore.NewMapDatastore()
dir, err := ioutil.TempDir("", "sbtest")
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
t.Fatal(err)
}
sdir, err := ioutil.TempDir("", "sbtest-s-")
if err != nil {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
paths := []fs.PathConfig{
{
Path: cdir,
Cache: true,
Weight: 1,
},
{
Path: sdir,
Cache: false,
Weight: 1,
},
}
sb, err := sectorbuilder.TempSectorbuilderDir(paths, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", dir)
fmt.Printf("not removing %s, %s\n", cdir, sdir)
return
}
if err := os.RemoveAll(dir); err != nil {
if err := os.RemoveAll(cdir); err != nil {
t.Error(err)
}
if err := os.RemoveAll(sdir); err != nil {
t.Error(err)
}
}
......@@ -190,13 +211,17 @@ func TestSealAndVerify(t *testing.T) {
epost := time.Now()
// Restart sectorbuilder, re-run post
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
sb, err = sectorbuilder.TempSectorbuilderDir(paths, sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
post(t, sb, s)
if err := sb.FinalizeSector(context.TODO(), 1); err != nil {
t.Fatalf("%+v", err)
}
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("Commit: %s\n", commit.Sub(precommit).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String())
......@@ -220,7 +245,7 @@ func TestSealPoStNoCommit(t *testing.T) {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
sb, err := sectorbuilder.TempSectorbuilderDir(sectorbuilder.SimplePath(dir), sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
......@@ -249,7 +274,7 @@ func TestSealPoStNoCommit(t *testing.T) {
precommit := time.Now()
// Restart sectorbuilder, re-run post
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
sb, err = sectorbuilder.TempSectorbuilderDir(sectorbuilder.SimplePath(dir), sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
......@@ -284,7 +309,7 @@ func TestSealAndVerify2(t *testing.T) {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
sb, err := sectorbuilder.TempSectorbuilderDir(sectorbuilder.SimplePath(dir), sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
......@@ -332,7 +357,7 @@ func TestAcquireID(t *testing.T) {
t.Fatal(err)
}
sb, err := sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
sb, err := sectorbuilder.TempSectorbuilderDir(sectorbuilder.SimplePath(dir), sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
......@@ -347,7 +372,7 @@ func TestAcquireID(t *testing.T) {
assertAcquire(2)
assertAcquire(3)
sb, err = sectorbuilder.TempSectorbuilderDir(dir, sectorSize, ds)
sb, err = sectorbuilder.TempSectorbuilderDir(sectorbuilder.SimplePath(dir), sectorSize, ds)
if err != nil {
t.Fatalf("%+v", err)
}
......
......@@ -66,7 +66,7 @@ type SectorBuilder struct {
unsealWait int32
fsLk sync.Mutex //nolint: struckcheck
filesystem *fs.FS // TODO: multi-fs support
filesystem *fs.FS
stopping chan struct{}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment