fs.go 6.83 KB
Newer Older
Łukasz Magiera's avatar
Łukasz Magiera committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package fs

import (
	"errors"
	"fmt"
	"math/big"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"sync"

	"github.com/filecoin-project/go-address"
	logging "github.com/ipfs/go-log/v2"
	"golang.org/x/xerrors"
)

var log = logging.Logger("sectorbuilder")

var ErrNotFound = errors.New("sector not found")
var ErrExists = errors.New("sector already exists")
Łukasz Magiera's avatar
Łukasz Magiera committed
22
var ErrNoSuitablePath = errors.New("no suitable path for sector fond")
Łukasz Magiera's avatar
Łukasz Magiera committed
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

type DataType string

const (
	DataCache    DataType = "cache"
	DataStaging  DataType = "staging"
	DataSealed   DataType = "sealed"
	DataUnsealed DataType = "unsealed"
)

var types = []DataType{DataCache, DataStaging, DataSealed, DataUnsealed}

var overheadMul = map[DataType]uint64{ // * sectorSize
	DataCache:    11,
	DataStaging:  1,
	DataSealed:   1,
	DataUnsealed: 1,
}

// StoragePath is a path to storage folder (.lotusstorage)
type StoragePath string

// SectorPath is a path to sector data (.lotusstorage/sealed/s-t0101-42))
type SectorPath string

func (p SectorPath) storage() StoragePath {
	return StoragePath(filepath.Dir(filepath.Dir(string(p))))
}

func (p SectorPath) typ() DataType {
	return DataType(filepath.Base(filepath.Dir(string(p))))
}

func (p SectorPath) id() (uint64, error) {
	b := filepath.Base(string(p))
	i := strings.LastIndexByte(b, '-')
	if i < 0 {
		return 0, xerrors.Errorf("malformed sector file name: '%s', expected to be in form 's-[miner]-[id]'", b)
	}
	id, err := strconv.ParseUint(b[i+1:], 10, 64)
	if err != nil {
		return 0, xerrors.Errorf("parsing sector id (name: '%s'): %w", b, err)
	}

	return id, nil
}

func (p SectorPath) miner() (address.Address, error) {
	b := filepath.Base(string(p))
	fi := strings.IndexByte(b, '-')
	li := strings.LastIndexByte(b, '-')
	if li < 0 || fi < 0 {
		return address.Undef, xerrors.Errorf("malformed sector file name: '%s', expected to be in form 's-[miner]-[id]'", b)
	}

	return address.NewFromString(b[fi+1 : li])
}

func SectorName(miner address.Address, sectorID uint64) string {
	return fmt.Sprintf("s-%s-%d", miner, sectorID)
}

Łukasz Magiera's avatar
Łukasz Magiera committed
85
func (p StoragePath) Sector(typ DataType, miner address.Address, id uint64) SectorPath {
Łukasz Magiera's avatar
Łukasz Magiera committed
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
	return SectorPath(filepath.Join(string(p), string(typ), SectorName(miner, id)))
}

type pathInfo struct {
	cache  bool // TODO: better name?
	weight int
}

type FS struct {
	paths map[StoragePath]*pathInfo

	// in progress actions

	// path -> datatype
	reserved map[StoragePath]map[DataType]uint64

	locks map[SectorPath]chan struct{}

	lk sync.Mutex
}

type PathConfig struct {
	Path string

	Cache  bool
	Weight int
}

func OpenFs(cfg []PathConfig) *FS {
	paths := map[StoragePath]*pathInfo{}
	for _, c := range cfg {
		paths[StoragePath(c.Path)] = &pathInfo{
			cache:  c.Cache,
			weight: c.Weight,
		}
	}
	return &FS{
		paths:    paths,
		reserved: map[StoragePath]map[DataType]uint64{},
		locks:    map[SectorPath]chan struct{}{},
	}
}

func (f *FS) Init() error {
	for path := range f.paths {
		for _, dir := range []string{string(path),
			filepath.Join(string(path), string(DataCache)),
			filepath.Join(string(path), string(DataStaging)),
			filepath.Join(string(path), string(DataSealed)),
			filepath.Join(string(path), string(DataUnsealed))} {
			if err := os.Mkdir(dir, 0755); err != nil {
				if os.IsExist(err) {
					continue
				}
				return err
			}
		}
	}

	return nil
}

func (f *FS) FindSector(typ DataType, miner address.Address, id uint64) (out SectorPath, err error) {
	// TODO: consider keeping some sort of index at some point

	for path := range f.paths {
Łukasz Magiera's avatar
Łukasz Magiera committed
152
		p := path.Sector(typ, miner, id)
Łukasz Magiera's avatar
Łukasz Magiera committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177

		_, err := os.Stat(string(p))
		if os.IsNotExist(err) {
			continue
		}
		if err != nil {
			log.Errorf("error scanning path %s for sector %s (%s): %+v", p, SectorName(miner, id), string(typ))
			continue
		}
		if out != "" {
			if !f.paths[p.storage()].cache {
				log.Errorf("%s also found in cache at %s", p, out)
				return p, nil
			}
		}
		out = p
	}

	if out == "" {
		return "", ErrNotFound
	}

	return out, nil
}

178
func (f *FS) findBestPath(size uint64, cache bool, strict bool) (StoragePath, error) {
Łukasz Magiera's avatar
Łukasz Magiera committed
179
180
	var best StoragePath
	bestw := big.NewInt(0)
181
182
183

	// If we need cache, only return cache. If we need storage, prefer storage, fall back to cache
	bestc := true
Łukasz Magiera's avatar
Łukasz Magiera committed
184
185

	for path, info := range f.paths {
Łukasz Magiera's avatar
Łukasz Magiera committed
186
		if info.cache != cache && (bestc != info.cache || strict) {
Łukasz Magiera's avatar
Łukasz Magiera committed
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
			continue
		}

		avail, _, err := f.availableBytes(path)
		if err != nil {
			log.Errorf("%+v", err)
			continue
		}

		if uint64(avail) < size {
			continue
		}

		w := big.NewInt(avail)
		w.Mul(w, big.NewInt(int64(info.weight)))
		if w.Cmp(bestw) > 0 {
203
204
205
206
			if info.cache == cache {
				bestw = w
			}

Łukasz Magiera's avatar
Łukasz Magiera committed
207
208
209
210
211
			best = path
			bestc = info.cache
		}
	}

212
213
214
215
216
217
218
219
220
	if best == "" {
		if cache {
			return best, xerrors.Errorf("no available cache: %w", ErrNoSuitablePath)
		}

		return best, xerrors.Errorf("no available storage: %w", ErrNoSuitablePath)
	}

	return best, nil
Łukasz Magiera's avatar
Łukasz Magiera committed
221
222
223
224
225
226
227
228
229
230
231
}

func (f *FS) ForceAllocSector(typ DataType, miner address.Address, ssize uint64, cache bool, id uint64) (SectorPath, error) {
	for {
		spath, err := f.FindSector(typ, miner, id)
		if err == ErrNotFound {
			break
		}
		if err != nil {
			return "", xerrors.Errorf("looking for existing sector data: %w", err)
		}
232
		log.Warnf("found existing sector data in %s, cleaning up", spath)
Łukasz Magiera's avatar
Łukasz Magiera committed
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255

		if err := os.RemoveAll(string(spath)); err != nil {
			return "", xerrors.Errorf("cleaning up sector data: %w", err)
		}
	}

	return f.AllocSector(typ, miner, ssize, cache, id)
}

// AllocSector finds the best path for this sector to use
func (f *FS) AllocSector(typ DataType, miner address.Address, ssize uint64, cache bool, id uint64) (SectorPath, error) {
	{
		spath, err := f.FindSector(typ, miner, id)
		if err == nil {
			return spath, xerrors.Errorf("allocating sector %s: %m", spath, ErrExists)
		}
		if err != ErrNotFound {
			return "", err
		}
	}

	need := overheadMul[typ] * ssize

256
257
258
	p, err := f.findBestPath(need, cache, false)
	if err != nil {
		return "", err
Łukasz Magiera's avatar
Łukasz Magiera committed
259
260
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
261
	sp := p.Sector(typ, miner, id)
Łukasz Magiera's avatar
Łukasz Magiera committed
262

Łukasz Magiera's avatar
Łukasz Magiera committed
263
	return sp, f.reserve(typ, sp.storage(), need)
Łukasz Magiera's avatar
Łukasz Magiera committed
264
265
}

Łukasz Magiera's avatar
Łukasz Magiera committed
266
func (f *FS) PrepareCacheMove(sector SectorPath, ssize uint64, tocache bool) (SectorPath, error) {
267
268
269
	p, err := f.findBestPath(ssize, tocache, true)
	if err != nil {
		return "", err
Łukasz Magiera's avatar
Łukasz Magiera committed
270
	}
271

Łukasz Magiera's avatar
Łukasz Magiera committed
272
273
274
	m, err := sector.miner()
	if err != nil {
		return "", err
Łukasz Magiera's avatar
Łukasz Magiera committed
275
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
276
277
278
	id, err := sector.id()
	if err != nil {
		return "", err
Łukasz Magiera's avatar
Łukasz Magiera committed
279
280
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
281
	return p.Sector(sector.typ(), m, id), f.reserve(sector.typ(), p, ssize)
Łukasz Magiera's avatar
Łukasz Magiera committed
282
283
}

Łukasz Magiera's avatar
Łukasz Magiera committed
284
func (f *FS) MoveSector(from, to SectorPath) error {
285
286
287
288
	if from == to {
		return nil
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
289
	inf, err := os.Stat(string(from))
Łukasz Magiera's avatar
Łukasz Magiera committed
290
	if err != nil {
Łukasz Magiera's avatar
Łukasz Magiera committed
291
		return xerrors.Errorf("stat %s: %w", from, err)
Łukasz Magiera's avatar
Łukasz Magiera committed
292
293
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
294
295
296
297
	if inf.IsDir() {
		err = migrateDir(string(from), string(to), false)
	} else {
		err = migrateFile(string(from), string(to), false)
Łukasz Magiera's avatar
Łukasz Magiera committed
298
	}
Łukasz Magiera's avatar
Łukasz Magiera committed
299
300
	if err != nil {
		return xerrors.Errorf("migrate sector %s -> %s: %w", from, to, err)
Łukasz Magiera's avatar
Łukasz Magiera committed
301
302
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
303
	// TODO: run some quick checks
Łukasz Magiera's avatar
Łukasz Magiera committed
304

Łukasz Magiera's avatar
Łukasz Magiera committed
305
306
	if err := os.RemoveAll(string(from)); err != nil {
		return xerrors.Errorf("cleanup %s: %w", from, err)
Łukasz Magiera's avatar
Łukasz Magiera committed
307
308
	}

Łukasz Magiera's avatar
Łukasz Magiera committed
309
	return nil
Łukasz Magiera's avatar
Łukasz Magiera committed
310
}