Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(statesync): implement statesync spec for the new approach #655

Merged
merged 22 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
920 changes: 414 additions & 506 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

54 changes: 23 additions & 31 deletions internal/statesync/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
errQueueEmpty = errors.New("requestQueue is empty")
errChunkNil = errors.New("cannot add nil chunk")
errNoChunkItem = errors.New("no chunk item found")
errNilSnapshot = errors.New("snapshot is nil")
)

const (
Expand Down Expand Up @@ -78,15 +79,15 @@ func newChunkQueue(snapshot *snapshot, tempDir string, bufLen int) (*chunkQueue,
}, nil
}

// IsRequestEmpty returns true if the request queue is empty
func (q *chunkQueue) IsRequestEmpty() bool {
q.mtx.Lock()
defer q.mtx.Unlock()
return len(q.requestQueue) == 0
// IsRequestQueueEmpty returns true if the request queue is empty
func (q *chunkQueue) IsRequestQueueEmpty() bool {
return q.RequestQueueLen() == 0
}

// IsRequestLen returns the length of the request queue
func (q *chunkQueue) IsRequestLen() int {
// RequestQueueLen returns the length of the request queue
func (q *chunkQueue) RequestQueueLen() int {
q.mtx.Lock()
defer q.mtx.Unlock()
return len(q.requestQueue)
lklimek marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -136,13 +137,12 @@ func (q *chunkQueue) Add(chunk *chunk) (bool, error) {
q.mtx.Lock()
defer q.mtx.Unlock()
if q.snapshot == nil {
return false, nil // queue is closed
return false, errNilSnapshot
}
chunkIDKey := chunk.ID.String()
item := q.items[chunkIDKey]
_, ok := q.items[chunk.ID.String()]
item, ok := q.items[chunkIDKey]
if !ok {
return false, fmt.Errorf("chunk item %x not found", chunk.ID)
return false, fmt.Errorf("failed to add the chunk %x, it was never requested", chunk.ID)
}
if item.status != inProgressStatus && item.status != discardedStatus {
return false, nil
Expand All @@ -152,15 +152,15 @@ func (q *chunkQueue) Add(chunk *chunk) (bool, error) {
return false, err
}
item.file = filepath.Join(q.dir, chunkIDKey)
item.sender = chunk.Sender
item.status = receivedStatus
err = item.write(chunk.Chunk)
if err != nil {
return false, err
}
item.sender = chunk.Sender
item.status = receivedStatus
q.applyCh <- chunk.ID
// Signal any waiters that the chunk has arrived.
item.closeWaiteChs(true)
item.closeWaitChs(true)
return true, nil
}

Expand All @@ -177,7 +177,7 @@ func (q *chunkQueue) Close() error {
<-q.applyCh
}
for _, item := range q.items {
item.closeWaiteChs(false)
item.closeWaitChs(false)
}
if err := os.RemoveAll(q.dir); err != nil {
return fmt.Errorf("failed to clean up state sync tempdir %s: %w", q.dir, err)
Expand Down Expand Up @@ -214,7 +214,7 @@ func (q *chunkQueue) DiscardSender(peerID types.NodeID) error {
q.mtx.Lock()
defer q.mtx.Unlock()
for _, item := range q.items {
if item.isDiscardable(peerID) {
if item.sender == peerID && item.isDiscardable() {
err := q.discard(item.chunkID)
if err != nil {
return err
Expand All @@ -236,14 +236,6 @@ func (q *chunkQueue) GetSender(chunkID bytes.HexBytes) types.NodeID {
return ""
}

// Has checks whether a chunk exists in the queue.
func (q *chunkQueue) Has(chunkID bytes.HexBytes) bool {
q.mtx.Lock()
defer q.mtx.Unlock()
item, ok := q.items[chunkID.String()]
return ok && item.status == doneStatus
}

// load loads a chunk from disk, or nil if the chunk is not in the queue. The caller must hold the
shotonoff marked this conversation as resolved.
Show resolved Hide resolved
// mutex lock.
func (q *chunkQueue) load(chunkID bytes.HexBytes) (*chunk, error) {
Expand Down Expand Up @@ -321,8 +313,7 @@ func (q *chunkQueue) RetryAll() {
}

// WaitFor returns a channel that receives a chunk ID when it arrives in the queue, or
// immediately if it has already arrived. The channel is closed without a value if the queue is
// closed or if the chunk ID is not valid.
// immediately if it has already arrived. The channel is closed without a value if the queue is closed
func (q *chunkQueue) WaitFor(chunkID bytes.HexBytes) <-chan bytes.HexBytes {
q.mtx.Lock()
defer q.mtx.Unlock()
Expand All @@ -331,11 +322,11 @@ func (q *chunkQueue) WaitFor(chunkID bytes.HexBytes) <-chan bytes.HexBytes {

func (q *chunkQueue) waitFor(chunkID bytes.HexBytes) <-chan bytes.HexBytes {
ch := make(chan bytes.HexBytes, 1)
item, ok := q.items[chunkID.String()]
lklimek marked this conversation as resolved.
Show resolved Hide resolved
if q.snapshot == nil {
close(ch)
return ch
}
item, ok := q.items[chunkID.String()]
if !ok {
ch <- chunkID
lklimek marked this conversation as resolved.
Show resolved Hide resolved
close(ch)
Expand All @@ -345,7 +336,7 @@ func (q *chunkQueue) waitFor(chunkID bytes.HexBytes) <-chan bytes.HexBytes {
return ch
}

// DoneChunksCount returns the number of chunks that have been returned.=
// DoneChunksCount returns the number of chunks that have been returned
func (q *chunkQueue) DoneChunksCount() int {
q.mtx.Lock()
defer q.mtx.Unlock()
Expand Down Expand Up @@ -390,7 +381,7 @@ func (c *chunkItem) loadData() ([]byte, error) {
return body, nil
}

func (c *chunkItem) closeWaiteChs(send bool) {
func (c *chunkItem) closeWaitChs(send bool) {
for _, ch := range c.waitChs {
if send {
ch <- c.chunkID
Expand All @@ -400,6 +391,7 @@ func (c *chunkItem) closeWaiteChs(send bool) {
c.waitChs = nil
}

func (c *chunkItem) isDiscardable(peerID types.NodeID) bool {
return c.sender == peerID && c.status == initStatus
// isDiscardable returns true if a status is suitable for transition to discarded, otherwise false
func (c *chunkItem) isDiscardable() bool {
return c.status == initStatus
}
8 changes: 4 additions & 4 deletions internal/statesync/chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (suite *ChunkQueueTestSuite) TestChunkQueue() {
err = suite.queue.Close()
require.NoError(err)
added, err = suite.queue.Add(suite.chunks[0])
require.NoError(err)
require.Error(err, errNilSnapshot)
require.False(added)

// Closing the queue again should also be fine
Expand Down Expand Up @@ -213,7 +213,7 @@ func (suite *ChunkQueueTestSuite) TestDiscardSender() {
// returned.
err = suite.queue.DiscardSender(suite.chunks[1].Sender)
suite.Require().NoError(err)
suite.Require().True(suite.queue.IsRequestEmpty())
suite.Require().True(suite.queue.IsRequestQueueEmpty())
}

func (suite *ChunkQueueTestSuite) TestGetSender() {
Expand Down Expand Up @@ -304,9 +304,9 @@ func (suite *ChunkQueueTestSuite) TestRetryAll() {
suite.initChunks()
suite.processChunks()
require := suite.Require()
require.True(suite.queue.IsRequestEmpty())
require.True(suite.queue.IsRequestQueueEmpty())
suite.queue.RetryAll()
require.Equal(len(suite.chunks), suite.queue.IsRequestLen())
require.Equal(len(suite.chunks), suite.queue.RequestQueueLen())
}

func (suite *ChunkQueueTestSuite) TestWaitFor() {
Expand Down
9 changes: 0 additions & 9 deletions internal/statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,15 +1204,6 @@ func (r *Reactor) SnapshotHeight() int64 {
}
return 0
}
func (r *Reactor) SnapshotChunksCount() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.syncer != nil && r.syncer.chunkQueue != nil {
return int64(r.syncer.chunkQueue.DoneChunksCount())
}
return 0
}

func (r *Reactor) BackFilledBlocks() int64 {
r.mtx.RLock()
Expand Down
24 changes: 12 additions & 12 deletions internal/statesync/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type snapshotPool struct {
snapshotPeers map[snapshotKey]map[types.NodeID]types.NodeID

// indexes for fast searches
formatIndex map[uint32]map[snapshotKey]bool
heightIndex map[uint64]map[snapshotKey]bool
peerIndex map[types.NodeID]map[snapshotKey]bool
versionIndex map[uint32]map[snapshotKey]bool
heightIndex map[uint64]map[snapshotKey]bool
peerIndex map[types.NodeID]map[snapshotKey]bool

// blacklists for rejected items
formatBlacklist map[uint32]bool
Expand All @@ -66,7 +66,7 @@ func newSnapshotPool() *snapshotPool {
return &snapshotPool{
snapshots: make(map[snapshotKey]*snapshot),
snapshotPeers: make(map[snapshotKey]map[types.NodeID]types.NodeID),
formatIndex: make(map[uint32]map[snapshotKey]bool),
versionIndex: make(map[uint32]map[snapshotKey]bool),
heightIndex: make(map[uint64]map[snapshotKey]bool),
peerIndex: make(map[types.NodeID]map[snapshotKey]bool),
formatBlacklist: make(map[uint32]bool),
Expand Down Expand Up @@ -111,10 +111,10 @@ func (p *snapshotPool) Add(peerID types.NodeID, snapshot *snapshot) (bool, error
}
p.snapshots[key] = snapshot

if p.formatIndex[snapshot.Version] == nil {
p.formatIndex[snapshot.Version] = make(map[snapshotKey]bool)
if p.versionIndex[snapshot.Version] == nil {
p.versionIndex[snapshot.Version] = make(map[snapshotKey]bool)
}
p.formatIndex[snapshot.Version][key] = true
p.versionIndex[snapshot.Version][key] = true

if p.heightIndex[snapshot.Height] == nil {
p.heightIndex[snapshot.Height] = make(map[snapshotKey]bool)
Expand Down Expand Up @@ -234,13 +234,13 @@ func (p *snapshotPool) Reject(snapshot *snapshot) {
p.removeSnapshot(key)
}

// RejectFormat rejects a snapshot format. It will never be used again.
func (p *snapshotPool) RejectFormat(format uint32) {
// RejectVersion rejects a snapshot version. It will never be used again.
func (p *snapshotPool) RejectVersion(version uint32) {
p.Lock()
defer p.Unlock()

p.formatBlacklist[format] = true
for key := range p.formatIndex[format] {
p.formatBlacklist[version] = true
for key := range p.versionIndex[version] {
p.removeSnapshot(key)
}
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func (p *snapshotPool) removeSnapshot(key snapshotKey) {
}

delete(p.snapshots, key)
delete(p.formatIndex[snapshot.Version], key)
delete(p.versionIndex[snapshot.Version], key)
delete(p.heightIndex[snapshot.Height], key)
for peerID := range p.snapshotPeers[key] {
delete(p.peerIndex[peerID], key)
Expand Down
2 changes: 1 addition & 1 deletion internal/statesync/snapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestSnapshotPool_RejectFormat(t *testing.T) {
require.NoError(t, err)
}

pool.RejectFormat(1)
pool.RejectVersion(1)
require.Equal(t, []*snapshot{snapshots[0], snapshots[2]}, pool.Ranked())

added, err := pool.Add(peerID, &snapshot{Height: 3, Version: 1, Hash: []byte{1}})
Expand Down
30 changes: 17 additions & 13 deletions internal/statesync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ var (
// errTimeout is returned by Sync() when we've waited too long to receive a chunk.
errTimeout = errors.New("timed out waiting for chunk")
// errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
errNoSnapshots = errors.New("no suitable snapshots found")
errNoSnapshots = errors.New("no suitable snapshots found")
errStatesyncNotInProgress = errors.New("no state sync in progress")
)

// syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
Expand Down Expand Up @@ -83,22 +84,25 @@ func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
if s.chunkQueue == nil {
return false, errors.New("no state sync in progress")
return false, errStatesyncNotInProgress
}
keyVals := []any{
"height", chunk.Height,
"version", chunk.Version,
"chunk", chunk.ID,
}
added, err := s.chunkQueue.Add(chunk)
if err != nil {
if errors.Is(err, errNilSnapshot) {
s.logger.Error("Can't add a chunk because of a snapshot is nil", keyVals...)
return false, nil
}
return false, err
}
if added {
s.logger.Debug("Added chunk to queue",
"height", chunk.Height,
"format", chunk.Version,
"chunk", chunk.ID)
s.logger.Debug("Added chunk to queue", keyVals...)
} else {
s.logger.Debug("Ignoring duplicate chunk in requestQueue",
"height", chunk.Height,
"format", chunk.Version,
"chunk", chunk.ID)
s.logger.Debug("Ignoring duplicate chunk in requestQueue", keyVals...)
}
return added, nil
}
Expand Down Expand Up @@ -240,7 +244,7 @@ func (s *syncer) SyncAny(
"hash", snapshot.Hash)

case errors.Is(err, errRejectFormat):
s.snapshots.RejectFormat(snapshot.Version)
s.snapshots.RejectVersion(snapshot.Version)
s.logger.Info("Snapshot format rejected", "format", snapshot.Version)

case errors.Is(err, errRejectSender):
Expand Down Expand Up @@ -300,7 +304,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, queue *chunkQueue
fmt.Errorf("failed to get app hash at height %d. No witnesses remaining", snapshot.Height)
}
s.logger.Info("failed to get and verify tendermint state. Dropping snapshot and trying again",
"err", err,
"error", err,
"height", snapshot.Height)
return sm.State{}, nil, errRejectSnapshot
}
Expand Down Expand Up @@ -496,7 +500,7 @@ func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, queue *chu
dequeueChunkIDTimeout = dequeueChunkIDTimeoutDefault
}
for {
if queue.IsRequestEmpty() {
if queue.IsRequestQueueEmpty() {
select {
case <-ctx.Done():
return
Expand Down
6 changes: 3 additions & 3 deletions internal/statesync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (suite *SyncerTestSuite) TestApplyChunksResults() {
}
go func() {
for i := 0; i < len(tc.resps); i++ {
for chunks.IsRequestEmpty() {
for chunks.IsRequestQueueEmpty() {
time.Sleep(5 * time.Millisecond)
}
chunkID, err := chunks.Dequeue()
Expand Down Expand Up @@ -704,7 +704,7 @@ func (suite *SyncerTestSuite) TestApplyChunksRefetchChunks() {
}
go func() {
for i := 0; i < len(tc.resp); i++ {
for queue.IsRequestEmpty() {
for queue.IsRequestQueueEmpty() {
time.Sleep(10 * time.Millisecond)
}
chunkID, err := queue.Dequeue()
Expand Down Expand Up @@ -822,7 +822,7 @@ func (suite *SyncerTestSuite) TestApplyChunksRejectSenders() {

go func() {
for i := 0; i < len(tc.resps); i++ {
for queue.IsRequestEmpty() {
for queue.IsRequestQueueEmpty() {
time.Sleep(10 * time.Millisecond)
}
chunkID, err := queue.Dequeue()
Expand Down
1 change: 1 addition & 0 deletions libs/ds/ordered_map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ds

// OrderedMap is a map with a deterministic iteration order
// this datastructure is not thread-safe
type OrderedMap[T comparable, V any] struct {
lklimek marked this conversation as resolved.
Show resolved Hide resolved
keys map[T]int
values []V
Expand Down
5 changes: 2 additions & 3 deletions proto/tendermint/abci/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ message RequestOfferSnapshot {
message RequestLoadSnapshotChunk {
uint64 height = 1; // The height of the snapshot the chunks belongs to.
uint32 version = 2; // The application-specific format of the snapshot the chunk belongs to.
bytes chunk_id = 3; // The chunk index, starting from 0 for the initial chunk.
bytes chunk_id = 3; // The chunk id is a hash of the node of subtree of the snapshot in GroveDb.
lklimek marked this conversation as resolved.
Show resolved Hide resolved
}

// Applies a snapshot chunk.
Expand Down Expand Up @@ -828,8 +828,7 @@ message Misbehavior {

message Snapshot {
uint64 height = 1; // The height at which the snapshot was taken
uint32 version = 2; // The application-specific snapshot version
optional uint32 chunks = 3; // Number of chunks in the snapshot
uint32 version = 2; // The application-specific snapshot version
bytes hash = 4; // Arbitrary snapshot hash, equal only if identical
bytes metadata = 5; // Arbitrary application metadata
}
Expand Down