fs.go 6.67 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package fs

import (
	"errors"
	"fmt"
	"math/big"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"sync"

	"github.com/filecoin-project/go-address"
	logging "github.com/ipfs/go-log/v2"
	"golang.org/x/xerrors"
)

var log = logging.Logger("sectorbuilder")

var ErrNotFound = errors.New("sector not found")
var ErrExists = errors.New("sector already exists")
Łukasz Magiera's avatar
Łukasz Magiera committed
22
var ErrNoSuitablePath = errors.New("no suitable path for sector fond")
Łukasz Magiera's avatar
Łukasz Magiera committed
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

type DataType string

const (
	DataCache    DataType = "cache"
	DataStaging  DataType = "staging"
	DataSealed   DataType = "sealed"
	DataUnsealed DataType = "unsealed"
)

var types = []DataType{DataCache, DataStaging, DataSealed, DataUnsealed}

var overheadMul = map[DataType]uint64{ // * sectorSize
	DataCache:    11,
	DataStaging:  1,
	DataSealed:   1,
	DataUnsealed: 1,
}

// StoragePath is a path to storage folder (.lotusstorage)
type StoragePath string

// SectorPath is a path to sector data (.lotusstorage/sealed/s-t0101-42))
type SectorPath string

func (p SectorPath) storage() StoragePath {
	return StoragePath(filepath.Dir(filepath.Dir(string(p))))
}

func (p SectorPath) typ() DataType {
	return DataType(filepath.Base(filepath.Dir(string(p))))
}

func (p SectorPath) id() (uint64, error) {
	b := filepath.Base(string(p))
	i := strings.LastIndexByte(b, '-')
	if i < 0 {
		return 0, xerrors.Errorf("malformed sector file name: '%s', expected to be in form 's-[miner]-[id]'", b)
	}
	id, err := strconv.ParseUint(b[i+1:], 10, 64)
	if err != nil {
		return 0, xerrors.Errorf("parsing sector id (name: '%s'): %w", b, err)
	}

	return id, nil
}

func (p SectorPath) miner() (address.Address, error) {
	b := filepath.Base(string(p))
	fi := strings.IndexByte(b, '-')
	li := strings.LastIndexByte(b, '-')
	if li < 0 || fi < 0 {
		return address.Undef, xerrors.Errorf("malformed sector file name: '%s', expected to be in form 's-[miner]-[id]'", b)
	}

	return address.NewFromString(b[fi+1 : li])
}

func SectorName(miner address.Address, sectorID uint64) string {
	return fmt.Sprintf("s-%s-%d", miner, sectorID)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
85
func (p StoragePath) Sector(typ DataType, miner address.Address, id uint64) SectorPath {
Łukasz Magiera's avatar
Łukasz Magiera committed
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
	return SectorPath(filepath.Join(string(p), string(typ), SectorName(miner, id)))
}

type pathInfo struct {
	cache  bool // TODO: better name?
	weight int
}

type FS struct {
	paths map[StoragePath]*pathInfo

	// in progress actions

	// path -> datatype
	reserved map[StoragePath]map[DataType]uint64

	locks map[SectorPath]chan struct{}

	lk sync.Mutex
}

type PathConfig struct {
	Path string

	Cache  bool
	Weight int
}

func OpenFs(cfg []PathConfig) *FS {
	paths := map[StoragePath]*pathInfo{}
	for _, c := range cfg {
		paths[StoragePath(c.Path)] = &pathInfo{
			cache:  c.Cache,
			weight: c.Weight,
		}
	}
	return &FS{
		paths:    paths,
		reserved: map[StoragePath]map[DataType]uint64{},
		locks:    map[SectorPath]chan struct{}{},
	}
}

func (f *FS) Init() error {
	for path := range f.paths {
		for _, dir := range []string{string(path),
			filepath.Join(string(path), string(DataCache)),
			filepath.Join(string(path), string(DataStaging)),
			filepath.Join(string(path), string(DataSealed)),
			filepath.Join(string(path), string(DataUnsealed))} {
			if err := os.Mkdir(dir, 0755); err != nil {
				if os.IsExist(err) {
					continue
				}
				return err
			}
		}
	}

	return nil
}

// TODO: fix the locking situation

func (f *FS) FindSector(typ DataType, miner address.Address, id uint64) (out SectorPath, err error) {
	// TODO: consider keeping some sort of index at some point

	for path := range f.paths {
Łukasz Magiera's avatar
Łukasz Magiera committed
154
		p := path.Sector(typ, miner, id)
Łukasz Magiera's avatar
Łukasz Magiera committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179

		_, err := os.Stat(string(p))
		if os.IsNotExist(err) {
			continue
		}
		if err != nil {
			log.Errorf("error scanning path %s for sector %s (%s): %+v", p, SectorName(miner, id), string(typ))
			continue
		}
		if out != "" {
			if !f.paths[p.storage()].cache {
				log.Errorf("%s also found in cache at %s", p, out)
				return p, nil
			}
		}
		out = p
	}

	if out == "" {
		return "", ErrNotFound
	}

	return out, nil
}

Łukasz Magiera's avatar
Łukasz Magiera committed
180
func (f *FS) findBestPath(size uint64, cache bool, strict bool) StoragePath {
Łukasz Magiera's avatar
Łukasz Magiera committed
181
182
	var best StoragePath
	bestw := big.NewInt(0)
183
184
185

	// If we need cache, only return cache. If we need storage, prefer storage, fall back to cache
	bestc := true
Łukasz Magiera's avatar
Łukasz Magiera committed
186
187

	for path, info := range f.paths {
Łukasz Magiera's avatar
Łukasz Magiera committed
188
		if info.cache != cache && (bestc != info.cache || strict) {
Łukasz Magiera's avatar
Łukasz Magiera committed
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
			continue
		}

		avail, _, err := f.availableBytes(path)
		if err != nil {
			log.Errorf("%+v", err)
			continue
		}

		if uint64(avail) < size {
			continue
		}

		w := big.NewInt(avail)
		w.Mul(w, big.NewInt(int64(info.weight)))
		if w.Cmp(bestw) > 0 {
205
206
207
208
			if info.cache == cache {
				bestw = w
			}

Łukasz Magiera's avatar
Łukasz Magiera committed
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
			best = path
			bestc = info.cache
		}
	}

	return best
}

func (f *FS) ForceAllocSector(typ DataType, miner address.Address, ssize uint64, cache bool, id uint64) (SectorPath, error) {
	for {
		spath, err := f.FindSector(typ, miner, id)
		if err == ErrNotFound {
			break
		}
		if err != nil {
			return "", xerrors.Errorf("looking for existing sector data: %w", err)
		}
226
		log.Warnf("found existing sector data in %s, cleaning up", spath)
Łukasz Magiera's avatar
Łukasz Magiera committed
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

		if err := os.RemoveAll(string(spath)); err != nil {
			return "", xerrors.Errorf("cleaning up sector data: %w", err)
		}
	}

	return f.AllocSector(typ, miner, ssize, cache, id)
}

// AllocSector finds the best path for this sector to use
func (f *FS) AllocSector(typ DataType, miner address.Address, ssize uint64, cache bool, id uint64) (SectorPath, error) {
	{
		spath, err := f.FindSector(typ, miner, id)
		if err == nil {
			return spath, xerrors.Errorf("allocating sector %s: %m", spath, ErrExists)
		}
		if err != ErrNotFound {
			return "", err
		}
	}

	need := overheadMul[typ] * ssize

250
	p := f.findBestPath(need, cache, false)
Łukasz Magiera's avatar
Łukasz Magiera committed
251
	if p == "" {
Łukasz Magiera's avatar
Łukasz Magiera committed
252
		return "", ErrNoSuitablePath
Łukasz Magiera's avatar
Łukasz Magiera committed
253
254
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
255
	sp := p.Sector(typ, miner, id)
Łukasz Magiera's avatar
Łukasz Magiera committed
256

Łukasz Magiera's avatar
Łukasz Magiera committed
257
	return sp, f.reserve(typ, sp.storage(), need)
Łukasz Magiera's avatar
Łukasz Magiera committed
258
259
}

Łukasz Magiera's avatar
Łukasz Magiera committed
260
261
262
263
func (f *FS) PrepareCacheMove(sector SectorPath, ssize uint64, tocache bool) (SectorPath, error) {
	p := f.findBestPath(ssize, tocache, true)
	if p == "" {
		return "", ErrNoSuitablePath
Łukasz Magiera's avatar
Łukasz Magiera committed
264
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
265
266
267
	m, err := sector.miner()
	if err != nil {
		return "", err
Łukasz Magiera's avatar
Łukasz Magiera committed
268
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
269
270
271
	id, err := sector.id()
	if err != nil {
		return "", err
Łukasz Magiera's avatar
Łukasz Magiera committed
272
273
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
274
	return p.Sector(sector.typ(), m, id), f.reserve(sector.typ(), p, ssize)
Łukasz Magiera's avatar
Łukasz Magiera committed
275
276
}

Łukasz Magiera's avatar
Łukasz Magiera committed
277
func (f *FS) MoveSector(from, to SectorPath) error {
278
279
280
281
	if from == to {
		return nil
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
282
	inf, err := os.Stat(string(from))
Łukasz Magiera's avatar
Łukasz Magiera committed
283
	if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
284
		return xerrors.Errorf("stat %s: %w", from, err)
Łukasz Magiera's avatar
Łukasz Magiera committed
285
286
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
287
288
289
290
	if inf.IsDir() {
		err = migrateDir(string(from), string(to), false)
	} else {
		err = migrateFile(string(from), string(to), false)
Łukasz Magiera's avatar
Łukasz Magiera committed
291
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
292
293
	if err != nil {
		return xerrors.Errorf("migrate sector %s -> %s: %w", from, to, err)
Łukasz Magiera's avatar
Łukasz Magiera committed
294
295
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
296
	// TODO: run some quick checks
Łukasz Magiera's avatar
Łukasz Magiera committed
297

Łukasz Magiera's avatar
Łukasz Magiera committed
298
299
	if err := os.RemoveAll(string(from)); err != nil {
		return xerrors.Errorf("cleanup %s: %w", from, err)
Łukasz Magiera's avatar
Łukasz Magiera committed
300
301
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
302
	return nil
Łukasz Magiera's avatar
Łukasz Magiera committed
303
}