Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(statesync): implement statesync spec for the new approach #655

Merged
merged 22 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (app *Application) LoadSnapshotChunk(_ context.Context, req *abci.RequestLo
app.mu.Lock()
defer app.mu.Unlock()

chunk, err := app.snapshots.LoadChunk(req.Height, req.Format, req.Chunk)
chunk, err := app.snapshots.LoadChunk(req.Height, req.Version, req.ChunkId)
if err != nil {
return &abci.ResponseLoadSnapshotChunk{}, err
}
Expand Down Expand Up @@ -523,7 +523,11 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
if app.offerSnapshot == nil {
return &abci.ResponseApplySnapshotChunk{}, fmt.Errorf("no restore in progress")
}
app.offerSnapshot.addChunk(int(req.Index), req.Chunk)

resp := &abci.ResponseApplySnapshotChunk{
Result: abci.ResponseApplySnapshotChunk_ACCEPT,
NextChunks: app.offerSnapshot.addChunk(req.ChunkId, req.Chunk),
}

if app.offerSnapshot.isFull() {
chunks := app.offerSnapshot.bytes()
Expand All @@ -538,11 +542,10 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
"snapshot_height", app.offerSnapshot.snapshot.Height,
"snapshot_apphash", app.offerSnapshot.appHash,
)
resp.Result = abci.ResponseApplySnapshotChunk_COMPLETE_SNAPSHOT
app.offerSnapshot = nil
}

resp := &abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}

app.logger.Debug("ApplySnapshotChunk", "resp", resp)
return resp, nil
}
Expand All @@ -556,7 +559,9 @@ func (app *Application) createSnapshot() error {
if err != nil {
return fmt.Errorf("create snapshot: %w", err)
}
app.logger.Info("created state sync snapshot", "height", height, "apphash", app.LastCommittedState.GetAppHash())
app.logger.Info("created state sync snapshot",
"height", height,
"apphash", app.LastCommittedState.GetAppHash())
err = app.snapshots.Prune(maxSnapshotCount)
if err != nil {
return fmt.Errorf("prune snapshots: %w", err)
Expand Down
30 changes: 13 additions & 17 deletions abci/example/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,24 +493,20 @@ func TestSnapshots(t *testing.T) {
})
require.NoError(t, err)
assert.Equal(t, types.ResponseOfferSnapshot_ACCEPT, respOffer.Result)
loaded, err := app.LoadSnapshotChunk(ctx, &types.RequestLoadSnapshotChunk{
Height: recentSnapshot.Height,
ChunkId: recentSnapshot.Hash,
Version: recentSnapshot.Version,
})
require.NoError(t, err)

for chunk := uint32(0); chunk < recentSnapshot.Chunks; chunk++ {
loaded, err := app.LoadSnapshotChunk(ctx, &types.RequestLoadSnapshotChunk{
Height: recentSnapshot.Height,
Chunk: chunk,
Format: recentSnapshot.Format,
})
require.NoError(t, err)

applied, err := dstApp.ApplySnapshotChunk(ctx, &types.RequestApplySnapshotChunk{
Index: chunk,
Chunk: loaded.Chunk,
Sender: "app",
})
require.NoError(t, err)
assert.Equal(t, types.ResponseApplySnapshotChunk_ACCEPT, applied.Result)
}

applied, err := dstApp.ApplySnapshotChunk(ctx, &types.RequestApplySnapshotChunk{
ChunkId: recentSnapshot.Hash,
Chunk: loaded.Chunk,
Sender: "app",
})
require.NoError(t, err)
assert.Equal(t, types.ResponseApplySnapshotChunk_COMPLETE_SNAPSHOT, applied.Result)
infoResp, err := dstApp.Info(ctx, &types.RequestInfo{})
require.NoError(t, err)
assertRespInfo(t, int64(recentSnapshot.Height), appHashes[snapshotHeight], *infoResp)
Expand Down
113 changes: 73 additions & 40 deletions abci/example/kvstore/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package kvstore

import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"

Expand All @@ -15,6 +15,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/ds"
)

const (
Expand All @@ -27,11 +28,17 @@ const (
// SnapshotStore stores state sync snapshots. Snapshots are stored simply as
// JSON files, and chunks are generated on-the-fly by splitting the JSON data
// into fixed-size chunks.
type SnapshotStore struct {
sync.RWMutex
dir string
metadata []abci.Snapshot
}
type (
SnapshotStore struct {
sync.RWMutex
dir string
metadata []abci.Snapshot
}
chunkItem struct {
Data []byte `json:"data"`
NextChunkIDs [][]byte `json:"nextChunkIDs"`
}
)

// NewSnapshotStore creates a new snapshot store.
func NewSnapshotStore(dir string) (*SnapshotStore, error) {
Expand All @@ -49,7 +56,7 @@ func NewSnapshotStore(dir string) (*SnapshotStore, error) {
// called internally on construction.
func (s *SnapshotStore) loadMetadata() error {
file := filepath.Join(s.dir, "metadata.json")
metadata := []abci.Snapshot{}
var metadata []abci.Snapshot

bz, err := os.ReadFile(file)
switch {
Expand Down Expand Up @@ -96,10 +103,9 @@ func (s *SnapshotStore) Create(state State) (abci.Snapshot, error) {
}
height := state.GetHeight()
snapshot := abci.Snapshot{
Height: uint64(height),
Format: 1,
Hash: crypto.Checksum(bz),
Chunks: byteChunks(bz),
Height: uint64(height),
Version: 1,
Hash: crypto.Checksum(bz),
}
err = os.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)), bz, 0644)
if err != nil {
Expand Down Expand Up @@ -152,16 +158,18 @@ func (s *SnapshotStore) List() ([]*abci.Snapshot, error) {
}

// LoadChunk loads a snapshot chunk.
func (s *SnapshotStore) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
func (s *SnapshotStore) LoadChunk(height uint64, version uint32, chunkID []byte) ([]byte, error) {
s.RLock()
defer s.RUnlock()
for _, snapshot := range s.metadata {
if snapshot.Height == height && snapshot.Format == format {
bz, err := os.ReadFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)))
if snapshot.Height == height && snapshot.Version == version {
bz, err := os.ReadFile(filepath.Join(s.dir, fmt.Sprintf("%d.json", height)))
if err != nil {
return nil, err
}
return byteChunk(bz, chunk), nil
chunks := makeChunks(bz, snapshotChunkSize)
item := makeChunkItem(chunks, chunkID)
return json.Marshal(item)
}
}
return nil, nil
Expand All @@ -170,54 +178,79 @@ func (s *SnapshotStore) LoadChunk(height uint64, format uint32, chunk uint32) ([
type offerSnapshot struct {
snapshot *abci.Snapshot
appHash tmbytes.HexBytes
chunks [][]byte
chunkCnt int
chunks *ds.OrderedMap[string, []byte]
}

func newOfferSnapshot(snapshot *abci.Snapshot, appHash tmbytes.HexBytes) *offerSnapshot {
return &offerSnapshot{
snapshot: snapshot,
appHash: appHash,
chunks: make([][]byte, snapshot.Chunks),
chunkCnt: 0,
chunks: ds.NewOrderedMap[string, []byte](),
}
}

func (s *offerSnapshot) addChunk(index int, chunk []byte) {
if s.chunks[index] != nil {
return
func (s *offerSnapshot) addChunk(chunkID tmbytes.HexBytes, data []byte) [][]byte {
chunkIDStr := chunkID.String()
if s.chunks.Has(chunkIDStr) {
return nil
}
s.chunks[index] = chunk
s.chunkCnt++
var item chunkItem
err := json.Unmarshal(data, &item)
if err != nil {
panic("failed to decode a chunk data: " + err.Error())
}
s.chunks.Put(chunkIDStr, item.Data)
return item.NextChunkIDs
}

func (s *offerSnapshot) isFull() bool {
return s.chunkCnt == int(s.snapshot.Chunks)
return bytes.Equal(crypto.Checksum(s.bytes()), s.snapshot.Hash)
}

func (s *offerSnapshot) bytes() []byte {
chunks := s.chunks.Values()
buf := bytes.NewBuffer(nil)
for _, chunk := range s.chunks {
for _, chunk := range chunks {
buf.Write(chunk)
}
return buf.Bytes()
}

// byteChunk returns the chunk at a given index from the full byte slice.
func byteChunk(bz []byte, index uint32) []byte {
start := int(index * snapshotChunkSize)
end := int((index + 1) * snapshotChunkSize)
switch {
case start >= len(bz):
return nil
case end >= len(bz):
return bz[start:]
default:
return bz[start:end]
// makeChunkItem returns the chunk at a given index from the full byte slice.
func makeChunkItem(chunks *ds.OrderedMap[string, []byte], chunkID []byte) chunkItem {
chunkIDStr := hex.EncodeToString(chunkID)
val, ok := chunks.Get(chunkIDStr)
if !ok {
panic("chunk not found")
}
chunkIDs := chunks.Keys()
ci := chunkItem{Data: val}
i := 0
for ; i < len(chunkIDs) && chunkIDs[i] != chunkIDStr; i++ {
}
if i+1 < len(chunkIDs) {
data, err := hex.DecodeString(chunkIDs[i+1])
if err != nil {
panic(err)
}
ci.NextChunkIDs = [][]byte{data}
}
return ci
}

// byteChunks calculates the number of chunks in the byte slice.
func byteChunks(bz []byte) uint32 {
return uint32(math.Ceil(float64(len(bz)) / snapshotChunkSize))
func makeChunks(bz []byte, chunkSize int) *ds.OrderedMap[string, []byte] {
chunks := ds.NewOrderedMap[string, []byte]()
totalHash := hex.EncodeToString(crypto.Checksum(bz))
key := totalHash
for i := 0; i < len(bz); i += chunkSize {
j := i + chunkSize
if j > len(bz) {
j = len(bz)
}
if i > 1 {
key = hex.EncodeToString(crypto.Checksum(bz[i:j]))
}
chunks.Put(key, append([]byte(nil), bz[i:j]...))
}
return chunks
}
37 changes: 37 additions & 0 deletions abci/example/kvstore/snapshots_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kvstore

import (
"encoding/hex"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestChunkItem(t *testing.T) {
const size = 64
chunks := makeChunks(makeBytes(1032), size)
keys := chunks.Keys()
values := chunks.Values()
for i, key := range keys {
chunkID, err := hex.DecodeString(key)
require.NoError(t, err)
item := makeChunkItem(chunks, chunkID)
require.Equal(t, values[i], item.Data)
if i+1 < len(keys) {
nextChunkID, err := hex.DecodeString(keys[i+1])
require.NoError(t, err)
require.Equal(t, [][]byte{nextChunkID}, item.NextChunkIDs)
} else {
require.Nil(t, item.NextChunkIDs)
}
}
}

func makeBytes(size int) []byte {
bz := make([]byte, size)
for i := 0; i < size; i++ {
bz[i] = byte(rand.Int63n(256))
}
return bz
}
Loading