Commit 3f336f70 authored by laser's avatar laser
Browse files

feat(drgplus): conform to new DRG+ API + expose seal, resume

parent 260a4fad
......@@ -36,6 +36,12 @@ type SortedSectorInfo struct {
f []SectorInfo
}
// SealTicket commits a sector to a subchain.
type SealTicket struct {
BlockHeight uint64
TicketBytes [32]byte
}
// NewSortedSectorInfo returns a SortedSectorInfo
func NewSortedSectorInfo(sectorInfo ...SectorInfo) SortedSectorInfo {
fn := func(i, j int) bool {
......@@ -92,6 +98,7 @@ type SealedSectorMetadata struct {
Proof []byte
Pieces []PieceMetadata
Health sealed_sector_health.Health
Ticket SealTicket
}
// SectorSealingStatus communicates how far along in the sealing process a
......@@ -102,9 +109,9 @@ type SectorSealingStatus struct {
SealErrorMsg string // will be nil unless State == Failed
CommD [CommitmentBytesLen]byte // will be empty unless State == Sealed
CommR [CommitmentBytesLen]byte // will be empty unless State == Sealed
CommRStar [CommitmentBytesLen]byte // will be empty unless State == Sealed
Proof []byte // will be empty unless State == Sealed
Pieces []PieceMetadata // will be empty unless State == Sealed
Ticket SealTicket // will be empty unless State == Sealed
}
// PieceMetadata represents a piece stored by the sector builder.
......@@ -121,8 +128,8 @@ func VerifySeal(
sectorSize uint64,
commR [CommitmentBytesLen]byte,
commD [CommitmentBytesLen]byte,
commRStar [CommitmentBytesLen]byte,
proverID [31]byte,
proverID [32]byte,
ticket [32]byte,
sectorID uint64,
proof []byte,
) (bool, error) {
......@@ -134,23 +141,23 @@ func VerifySeal(
commRCBytes := C.CBytes(commR[:])
defer C.free(commRCBytes)
commRStarCBytes := C.CBytes(commRStar[:])
defer C.free(commRStarCBytes)
proofCBytes := C.CBytes(proof[:])
defer C.free(proofCBytes)
proverIDCBytes := C.CBytes(proverID[:])
defer C.free(proverIDCBytes)
ticketCBytes := C.CBytes(ticket[:])
defer C.free(ticketCBytes)
// a mutable pointer to a VerifySealResponse C-struct
resPtr := C.sector_builder_ffi_verify_seal(
C.uint64_t(sectorSize),
(*[CommitmentBytesLen]C.uint8_t)(commRCBytes),
(*[CommitmentBytesLen]C.uint8_t)(commDCBytes),
(*[CommitmentBytesLen]C.uint8_t)(commRStarCBytes),
(*[31]C.uint8_t)(proverIDCBytes),
(*[32]C.uint8_t)(proverIDCBytes),
C.uint64_t(sectorID),
(*[32]C.uint8_t)(ticketCBytes),
(*C.uint8_t)(proofCBytes),
C.size_t(len(proof)),
)
......@@ -244,7 +251,7 @@ func InitSectorBuilder(
poStProofPartitions uint8,
lastUsedSectorID uint64,
metadataDir string,
proverID [31]byte,
proverID [32]byte,
sealedSectorDir string,
stagedSectorDir string,
maxNumOpenStagedSectors uint8,
......@@ -272,7 +279,7 @@ func InitSectorBuilder(
class,
C.uint64_t(lastUsedSectorID),
cMetadataDir,
(*[31]C.uint8_t)(proverIDCBytes),
(*[32]C.uint8_t)(proverIDCBytes),
cSealedSectorDir,
cStagedSectorDir,
C.uint8_t(maxNumOpenStagedSectors),
......@@ -377,18 +384,81 @@ func ReadPieceFromSealedSector(sectorBuilderPtr unsafe.Pointer, pieceKey string)
return goBytes(resPtr.data_ptr, resPtr.data_len), nil
}
// SealAllStagedSectors schedules sealing of all staged sectors.
func SealAllStagedSectors(sectorBuilderPtr unsafe.Pointer) error {
// SealSector seals the sector with the provided id, blocking until sealing
// completes. If no staged sector exists in the ReadyToSeal state with such an
// id, an error will be returned.
func SealSector(sectorBuilderPtr unsafe.Pointer, sectorID uint64, ticket SealTicket) (SealedSectorMetadata, error) {
defer elapsed("SealSector")()
cTicketBytes := C.CBytes(ticket.TicketBytes[:])
defer C.free(cTicketBytes)
cSealTicket := C.sector_builder_ffi_FFISealTicket{
block_height: C.uint64_t(ticket.BlockHeight),
ticket_bytes: *(*[32]C.uint8_t)(cTicketBytes),
}
resPtr := C.sector_builder_ffi_seal_sector((*C.sector_builder_ffi_SectorBuilder)(sectorBuilderPtr), C.uint64_t(sectorID), cSealTicket)
defer C.sector_builder_ffi_destroy_seal_sector_response(resPtr)
if resPtr.status_code != 0 {
return SealedSectorMetadata{}, errors.New(C.GoString(resPtr.error_msg))
}
meta, err := goSealedSectorMetadata((*C.sector_builder_ffi_FFISealedSectorMetadata)(unsafe.Pointer(&resPtr.meta)), 1)
if err != nil {
return SealedSectorMetadata{}, err
}
return meta[0], nil
}
// ResumeSealSector resumes sealing for a sector in the Paused state. If no
// staged sector exists in such a state, an error will be returned.
func ResumeSealSector(sectorBuilderPtr unsafe.Pointer, sectorID uint64) (SealedSectorMetadata, error) {
defer elapsed("ResumeSealSector")()
resPtr := C.sector_builder_ffi_resume_seal_sector((*C.sector_builder_ffi_SectorBuilder)(sectorBuilderPtr), C.uint64_t(sectorID))
defer C.sector_builder_ffi_destroy_resume_seal_sector_response(resPtr)
if resPtr.status_code != 0 {
return SealedSectorMetadata{}, errors.New(C.GoString(resPtr.error_msg))
}
meta, err := goSealedSectorMetadata((*C.sector_builder_ffi_FFISealedSectorMetadata)(unsafe.Pointer(&resPtr.meta)), 1)
if err != nil {
return SealedSectorMetadata{}, err
}
return meta[0], nil
}
// SealAllStagedSectors seals all staged sectors and returns sealed sector
// metadata for all successfully sealed sectors.
func SealAllStagedSectors(sectorBuilderPtr unsafe.Pointer, ticket SealTicket) ([]SealedSectorMetadata, error) {
defer elapsed("SealAllStagedSectors")()
resPtr := C.sector_builder_ffi_seal_all_staged_sectors((*C.sector_builder_ffi_SectorBuilder)(sectorBuilderPtr))
cTicketBytes := C.CBytes(ticket.TicketBytes[:])
defer C.free(cTicketBytes)
cSealTicket := C.sector_builder_ffi_FFISealTicket{
block_height: C.uint64_t(ticket.BlockHeight),
ticket_bytes: *(*[32]C.uint8_t)(cTicketBytes),
}
resPtr := C.sector_builder_ffi_seal_all_staged_sectors((*C.sector_builder_ffi_SectorBuilder)(sectorBuilderPtr), cSealTicket)
defer C.sector_builder_ffi_destroy_seal_all_staged_sectors_response(resPtr)
if resPtr.status_code != 0 {
return errors.New(C.GoString(resPtr.error_msg))
return nil, errors.New(C.GoString(resPtr.error_msg))
}
return nil
meta, err := goSealedSectorMetadata(resPtr.meta_ptr, resPtr.meta_len)
if err != nil {
return nil, err
}
return meta, nil
}
// GetAllStagedSectors returns a slice of all staged sector metadata for the sector builder.
......@@ -445,6 +515,10 @@ func GetSectorSealingStatusByID(sectorBuilderPtr unsafe.Pointer, sectorID uint64
if resPtr.seal_status_code == C.Failed {
return SectorSealingStatus{SectorID: sectorID, State: sealing_state.Failed, SealErrorMsg: C.GoString(resPtr.seal_error_msg)}, nil
} else if resPtr.seal_status_code == C.ReadyForSealing {
return SectorSealingStatus{SectorID: sectorID, State: sealing_state.ReadyForSealing}, nil
} else if resPtr.seal_status_code == C.Paused {
return SectorSealingStatus{SectorID: sectorID, State: sealing_state.Paused}, nil
} else if resPtr.seal_status_code == C.Pending {
return SectorSealingStatus{SectorID: sectorID, State: sealing_state.Pending}, nil
} else if resPtr.seal_status_code == C.Sealing {
......@@ -458,10 +532,6 @@ func GetSectorSealingStatusByID(sectorBuilderPtr unsafe.Pointer, sectorID uint64
var commD [CommitmentBytesLen]byte
copy(commD[:], commDSlice)
commRStarSlice := goBytes(&resPtr.comm_r_star[0], CommitmentBytesLen)
var commRStar [CommitmentBytesLen]byte
copy(commRStar[:], commRStarSlice)
proof := goBytes(resPtr.proof_ptr, resPtr.proof_len)
ps, err := goPieceMetadata(resPtr.pieces_ptr, resPtr.pieces_len)
......@@ -470,13 +540,13 @@ func GetSectorSealingStatusByID(sectorBuilderPtr unsafe.Pointer, sectorID uint64
}
return SectorSealingStatus{
SectorID: sectorID,
State: sealing_state.Sealed,
CommD: commD,
CommR: commR,
CommRStar: commRStar,
Proof: proof,
Pieces: ps,
SectorID: sectorID,
State: sealing_state.Sealed,
CommD: commD,
CommR: commR,
Proof: proof,
Pieces: ps,
Ticket: goSealTicket(resPtr.seal_ticket),
}, nil
} else {
// unknown
......@@ -603,7 +673,7 @@ func getAllSealedSectors(sectorBuilderPtr unsafe.Pointer, performHealthchecks bo
return nil, errors.New(C.GoString(resPtr.error_msg))
}
meta, err := goSealedSectorMetadata(resPtr.sectors_ptr, resPtr.sectors_len)
meta, err := goSealedSectorMetadata(resPtr.meta_ptr, resPtr.meta_len)
if err != nil {
return nil, err
}
......
......@@ -20,6 +20,18 @@ import (
)
func TestSectorBuilderLifecycle(t *testing.T) {
ticketA := sb.SealTicket{
BlockHeight: 0,
TicketBytes: [32]byte{},
}
ticketB := sb.SealTicket{
BlockHeight: 10,
TicketBytes: [32]byte{1, 2, 3},
}
proverID := [32]byte{6, 7, 8}
metadataDir := requireTempDirPath(t)
defer require.NoError(t, os.Remove(metadataDir))
......@@ -29,7 +41,7 @@ func TestSectorBuilderLifecycle(t *testing.T) {
stagedSectorDir := requireTempDirPath(t)
defer require.NoError(t, os.Remove(stagedSectorDir))
ptr, err := sb.InitSectorBuilder(1024, 2, 1, 0, metadataDir, [31]byte{}, sealedSectorDir, stagedSectorDir, 1)
ptr, err := sb.InitSectorBuilder(1024, 2, 1, 0, metadataDir, proverID, sealedSectorDir, stagedSectorDir, 1)
require.NoError(t, err)
defer sb.DestroySectorBuilder(ptr)
......@@ -49,19 +61,21 @@ func TestSectorBuilderLifecycle(t *testing.T) {
require.Equal(t, uint64(read), maxPieceSize)
require.NoError(t, err)
pieceFile := requireTempFile(t, bytes.NewReader(pieceBytes), maxPieceSize)
pieceFileA := requireTempFile(t, bytes.NewReader(pieceBytes), maxPieceSize)
require.NoError(t, err)
pieceFileB := requireTempFile(t, bytes.NewReader(pieceBytes), maxPieceSize)
// generate piece commitment
commP, err := sb.GeneratePieceCommitmentFromFile(pieceFile, maxPieceSize)
commP, err := sb.GeneratePieceCommitmentFromFile(pieceFileA, maxPieceSize)
require.NoError(t, err)
// seek to the beginning
_, err = pieceFile.Seek(0, 0)
_, err = pieceFileA.Seek(0, 0)
require.NoError(t, err)
// write a piece to a staged sector, reducing remaining space to 0 and
// triggering the seal job
sectorID, err := sb.AddPieceFromFile(ptr, "snoqualmie", maxPieceSize, pieceFile)
// write a piece to a staged sector, reducing remaining space to 0
sectorIDA, err := sb.AddPieceFromFile(ptr, "snoqualmie", maxPieceSize, pieceFileA)
require.NoError(t, err)
stagedSectors, err := sb.GetAllStagedSectors(ptr)
......@@ -70,26 +84,57 @@ func TestSectorBuilderLifecycle(t *testing.T) {
stagedSector := stagedSectors[0]
require.Equal(t, uint64(1), stagedSector.SectorID)
// block until Groth parameter cache is (lazily) hydrated and sector has
// been sealed (or timeout)
status, err := pollForSectorSealingStatus(ptr, sectorID, sealing_state.Sealed, time.Minute*30)
// block until the sector is ready for us to begin sealing
statusA, err := pollForSectorSealingStatus(ptr, sectorIDA, sealing_state.ReadyForSealing, time.Minute)
require.NoError(t, err)
// seal all staged sectors
go func() {
// blocks until sealing has completed
meta, err := sb.SealAllStagedSectors(ptr, ticketA)
require.NoError(t, err)
require.Equal(t, 1, len(meta))
require.Equal(t, 1, len(meta[0].Pieces), "expected to see the one piece we added")
require.Equal(t, stagedSector.SectorID, meta[0].SectorID)
}()
// block until the sector begins to seal
_, err = pollForSectorSealingStatus(ptr, sectorIDA, sealing_state.Sealing, 15*time.Second)
require.NoError(t, err)
require.Equal(t, 1, len(status.Pieces), "expected to see the one piece we added")
// write a second piece to a staged sector, reducing remaining space to 0
sectorIDB, err := sb.AddPieceFromFile(ptr, "duvall", maxPieceSize, pieceFileB)
require.NoError(t, err)
go func() {
meta, err := sb.SealSector(ptr, sectorIDB, ticketB)
require.NoError(t, err)
require.Equal(t, sectorIDB, meta.SectorID)
}()
// block until both sectors have successfully sealed
statusA, err = pollForSectorSealingStatus(ptr, sectorIDA, sealing_state.Sealed, 30*time.Minute)
require.NoError(t, err)
require.Equal(t, ticketA, statusA.Ticket)
statusB, err := pollForSectorSealingStatus(ptr, sectorIDB, sealing_state.Sealed, 30*time.Minute)
require.NoError(t, err)
require.Equal(t, ticketB, statusB.Ticket)
// verify the seal proof
isValid, err := sb.VerifySeal(1024, status.CommR, status.CommD, status.CommRStar, [31]byte{}, sectorID, status.Proof)
isValid, err := sb.VerifySeal(1024, statusA.CommR, statusA.CommD, proverID, ticketA.TicketBytes, sectorIDA, statusA.Proof)
require.NoError(t, err)
require.True(t, isValid)
// verify the piece inclusion proof
isValid, err = sb.VerifyPieceInclusionProof(1024, maxPieceSize, commP, status.CommD, status.Pieces[0].InclusionProof)
isValid, err = sb.VerifyPieceInclusionProof(1024, maxPieceSize, commP, statusA.CommD, statusA.Pieces[0].InclusionProof)
require.NoError(t, err)
require.True(t, isValid)
// enforces sort ordering of SectorInfo tuples
sectorInfo := sb.NewSortedSectorInfo(sb.SectorInfo{
SectorID: status.SectorID,
CommR: status.CommR,
SectorID: statusA.SectorID,
CommR: statusA.CommR,
})
// generate a PoSt
......@@ -103,14 +148,13 @@ func TestSectorBuilderLifecycle(t *testing.T) {
sealedSectors, err = sb.GetAllSealedSectorsWithHealth(ptr)
require.NoError(t, err)
require.Equal(t, 1, len(sealedSectors), "expected to see one sealed sector")
sealedSector := sealedSectors[0]
require.Equal(t, uint64(1), sealedSector.SectorID)
require.Equal(t, 1, len(sealedSector.Pieces))
require.Equal(t, sealed_sector_health.Ok, sealedSector.Health)
// the piece is the size of the sector, so its piece commitment should be the
// data commitment
require.Equal(t, commP, sealedSector.CommD)
require.Equal(t, 2, len(sealedSectors), "expected to see two sealed sectors")
for _, sealedSector := range sealedSectors {
require.Equal(t, sealed_sector_health.Ok, sealedSector.Health)
}
// both sealed sectors contain the same data, so either will suffice
require.Equal(t, commP, sealedSectors[0].CommD)
// unseal the sector and retrieve the client's piece, verifying that the
// retrieved bytes match what we originally wrote to the staged sector
......
......@@ -6,8 +6,6 @@ require (
github.com/golangci/golangci-lint v1.17.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/ipfs/go-log v0.0.1
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/neelance/parallel v0.0.0-20160708114440-4de9ce63d14c // indirect
github.com/opentracing/basictracer-go v1.0.0 // indirect
github.com/pkg/errors v0.8.1
......
......@@ -4,11 +4,13 @@ package sealing_state
type State int
const (
Unknown State = iota
Pending // sector is still accepting user data
Failed // sealing failed
Sealing // sector is currently being sealed
Sealed // sector has been sealed successfully
Unknown State = iota
Pending // sector is still accepting user data
Failed // sealing failed
Sealing // sector is currently being sealed
Sealed // sector has been sealed successfully
Paused // sector sealing has been paused and can be resumed
ReadyForSealing // staged sector is full and is ready to seal
)
var labels = [...]string{
......@@ -17,6 +19,8 @@ var labels = [...]string{
"Failed",
"Sealing",
"Sealed",
"Paused",
"ReadyForSealing",
}
func (el State) String() string {
......
......@@ -44,6 +44,17 @@ func goBytes(src *C.uint8_t, size C.size_t) []byte {
return C.GoBytes(unsafe.Pointer(src), C.int(size))
}
func goSealTicket(src C.sector_builder_ffi_FFISealTicket) SealTicket {
ticketBytesSlice := C.GoBytes(unsafe.Pointer(&src.ticket_bytes[0]), 32)
var ticketBytes [CommitmentBytesLen]byte
copy(ticketBytes[:], ticketBytesSlice)
return SealTicket{
TicketBytes: ticketBytes,
BlockHeight: uint64(src.block_height),
}
}
func goStagedSectorMetadata(src *C.sector_builder_ffi_FFIStagedSectorMetadata, size C.size_t) ([]StagedSectorMetadata, error) {
sectors := make([]StagedSectorMetadata, size)
if src == nil || size == 0 {
......@@ -76,10 +87,6 @@ func goSealedSectorMetadata(src *C.sector_builder_ffi_FFISealedSectorMetadata, s
var commR [CommitmentBytesLen]byte
copy(commR[:], commRSlice)
commRStarSlice := goBytes(&ptrs[i].comm_r_star[0], CommitmentBytesLen)
var commRStar [CommitmentBytesLen]byte
copy(commRStar[:], commRStarSlice)
proof := goBytes(ptrs[i].proofs_ptr, ptrs[i].proofs_len)
pieces, err := goPieceMetadata(ptrs[i].pieces_ptr, ptrs[i].pieces_len)
......@@ -93,13 +100,13 @@ func goSealedSectorMetadata(src *C.sector_builder_ffi_FFISealedSectorMetadata, s
}
sectors[i] = SealedSectorMetadata{
SectorID: uint64(ptrs[i].sector_id),
CommD: commD,
CommR: commR,
CommRStar: commRStar,
Proof: proof,
Pieces: pieces,
Health: health,
SectorID: uint64(ptrs[i].sector_id),
CommD: commD,
CommR: commR,
Proof: proof,
Pieces: pieces,
Health: health,
Ticket: goSealTicket(ptrs[i].seal_ticket),
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment