Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(release): update changelog and bump version to 0.13.0-dev.2 #662

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f44e0d4
feat: add ordered_map.go
shotonoff Jul 3, 2023
2ae8018
feat: introduce a new approach of a state sync
shotonoff Jul 3, 2023
6076550
fix: kvstore_test.go
shotonoff Jul 3, 2023
ab635e9
fix: link issues
shotonoff Jul 3, 2023
0b37f30
fix: data race in syncer_test.go
shotonoff Jul 3, 2023
97a6dab
refactor: regenerate proto files
shotonoff Jul 3, 2023
8b52cb9
chore: regenerate proto file
shotonoff Jul 3, 2023
165f42a
refactor: modify kvstore to be compatible with a new statesync approach
shotonoff Jul 4, 2023
1317f53
refactor: sort import
shotonoff Jul 4, 2023
38cefed
fix: kvstore_test.go
shotonoff Jul 4, 2023
948ce0e
fix: data race in syncer.go
shotonoff Jul 4, 2023
6e6e424
chore: fix cs
shotonoff Jul 4, 2023
fde951e
Update internal/statesync/chunks.go
shotonoff Jul 17, 2023
1526126
Update internal/statesync/syncer.go
shotonoff Jul 17, 2023
98db32a
Update internal/statesync/chunks.go
shotonoff Jul 17, 2023
23b2c41
refactor: modifications according to PR feedback
shotonoff Jul 17, 2023
2f88a3a
fix: TestChunkQueue
shotonoff Jul 17, 2023
74d2c3d
refactor: apply the changes according to PR
shotonoff Jul 18, 2023
5ab453c
chore: regenerate proto file
shotonoff Jul 19, 2023
8db1715
chore: regenerate all mockery files
shotonoff Jul 19, 2023
8e61d55
chore: regen proto files
lklimek Jul 19, 2023
8047275
chore: make mockery
lklimek Jul 19, 2023
f8da9f7
Merge pull request #655 from dashpay/feat/statesync-implement-spec-fo…
shotonoff Jul 19, 2023
57788e3
chore(release): update changelog and version to 0.13.0-dev.2
shotonoff Jul 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
829 changes: 454 additions & 375 deletions CHANGELOG.md

Large diffs are not rendered by default.

15 changes: 10 additions & 5 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (app *Application) LoadSnapshotChunk(_ context.Context, req *abci.RequestLo
app.mu.Lock()
defer app.mu.Unlock()

chunk, err := app.snapshots.LoadChunk(req.Height, req.Format, req.Chunk)
chunk, err := app.snapshots.LoadChunk(req.Height, req.Version, req.ChunkId)
if err != nil {
return &abci.ResponseLoadSnapshotChunk{}, err
}
Expand Down Expand Up @@ -523,7 +523,11 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
if app.offerSnapshot == nil {
return &abci.ResponseApplySnapshotChunk{}, fmt.Errorf("no restore in progress")
}
app.offerSnapshot.addChunk(int(req.Index), req.Chunk)

resp := &abci.ResponseApplySnapshotChunk{
Result: abci.ResponseApplySnapshotChunk_ACCEPT,
NextChunks: app.offerSnapshot.addChunk(req.ChunkId, req.Chunk),
}

if app.offerSnapshot.isFull() {
chunks := app.offerSnapshot.bytes()
Expand All @@ -538,11 +542,10 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
"snapshot_height", app.offerSnapshot.snapshot.Height,
"snapshot_apphash", app.offerSnapshot.appHash,
)
resp.Result = abci.ResponseApplySnapshotChunk_COMPLETE_SNAPSHOT
app.offerSnapshot = nil
}

resp := &abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}

app.logger.Debug("ApplySnapshotChunk", "resp", resp)
return resp, nil
}
Expand All @@ -556,7 +559,9 @@ func (app *Application) createSnapshot() error {
if err != nil {
return fmt.Errorf("create snapshot: %w", err)
}
app.logger.Info("created state sync snapshot", "height", height, "apphash", app.LastCommittedState.GetAppHash())
app.logger.Info("created state sync snapshot",
"height", height,
"apphash", app.LastCommittedState.GetAppHash())
err = app.snapshots.Prune(maxSnapshotCount)
if err != nil {
return fmt.Errorf("prune snapshots: %w", err)
Expand Down
30 changes: 13 additions & 17 deletions abci/example/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,24 +493,20 @@ func TestSnapshots(t *testing.T) {
})
require.NoError(t, err)
assert.Equal(t, types.ResponseOfferSnapshot_ACCEPT, respOffer.Result)
loaded, err := app.LoadSnapshotChunk(ctx, &types.RequestLoadSnapshotChunk{
Height: recentSnapshot.Height,
ChunkId: recentSnapshot.Hash,
Version: recentSnapshot.Version,
})
require.NoError(t, err)

for chunk := uint32(0); chunk < recentSnapshot.Chunks; chunk++ {
loaded, err := app.LoadSnapshotChunk(ctx, &types.RequestLoadSnapshotChunk{
Height: recentSnapshot.Height,
Chunk: chunk,
Format: recentSnapshot.Format,
})
require.NoError(t, err)

applied, err := dstApp.ApplySnapshotChunk(ctx, &types.RequestApplySnapshotChunk{
Index: chunk,
Chunk: loaded.Chunk,
Sender: "app",
})
require.NoError(t, err)
assert.Equal(t, types.ResponseApplySnapshotChunk_ACCEPT, applied.Result)
}

applied, err := dstApp.ApplySnapshotChunk(ctx, &types.RequestApplySnapshotChunk{
ChunkId: recentSnapshot.Hash,
Chunk: loaded.Chunk,
Sender: "app",
})
require.NoError(t, err)
assert.Equal(t, types.ResponseApplySnapshotChunk_COMPLETE_SNAPSHOT, applied.Result)
infoResp, err := dstApp.Info(ctx, &types.RequestInfo{})
require.NoError(t, err)
assertRespInfo(t, int64(recentSnapshot.Height), appHashes[snapshotHeight], *infoResp)
Expand Down
113 changes: 73 additions & 40 deletions abci/example/kvstore/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package kvstore

import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"path/filepath"

Expand All @@ -15,6 +15,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/ds"
)

const (
Expand All @@ -27,11 +28,17 @@ const (
// SnapshotStore stores state sync snapshots. Snapshots are stored simply as
// JSON files, and chunks are generated on-the-fly by splitting the JSON data
// into fixed-size chunks.
type SnapshotStore struct {
sync.RWMutex
dir string
metadata []abci.Snapshot
}
type (
SnapshotStore struct {
sync.RWMutex
dir string
metadata []abci.Snapshot
}
chunkItem struct {
Data []byte `json:"data"`
NextChunkIDs [][]byte `json:"nextChunkIDs"`
}
)

// NewSnapshotStore creates a new snapshot store.
func NewSnapshotStore(dir string) (*SnapshotStore, error) {
Expand All @@ -49,7 +56,7 @@ func NewSnapshotStore(dir string) (*SnapshotStore, error) {
// called internally on construction.
func (s *SnapshotStore) loadMetadata() error {
file := filepath.Join(s.dir, "metadata.json")
metadata := []abci.Snapshot{}
var metadata []abci.Snapshot

bz, err := os.ReadFile(file)
switch {
Expand Down Expand Up @@ -96,10 +103,9 @@ func (s *SnapshotStore) Create(state State) (abci.Snapshot, error) {
}
height := state.GetHeight()
snapshot := abci.Snapshot{
Height: uint64(height),
Format: 1,
Hash: crypto.Checksum(bz),
Chunks: byteChunks(bz),
Height: uint64(height),
Version: 1,
Hash: crypto.Checksum(bz),
}
err = os.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)), bz, 0644)
if err != nil {
Expand Down Expand Up @@ -152,16 +158,18 @@ func (s *SnapshotStore) List() ([]*abci.Snapshot, error) {
}

// LoadChunk loads a snapshot chunk.
func (s *SnapshotStore) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
func (s *SnapshotStore) LoadChunk(height uint64, version uint32, chunkID []byte) ([]byte, error) {
s.RLock()
defer s.RUnlock()
for _, snapshot := range s.metadata {
if snapshot.Height == height && snapshot.Format == format {
bz, err := os.ReadFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)))
if snapshot.Height == height && snapshot.Version == version {
bz, err := os.ReadFile(filepath.Join(s.dir, fmt.Sprintf("%d.json", height)))
if err != nil {
return nil, err
}
return byteChunk(bz, chunk), nil
chunks := makeChunks(bz, snapshotChunkSize)
item := makeChunkItem(chunks, chunkID)
return json.Marshal(item)
}
}
return nil, nil
Expand All @@ -170,54 +178,79 @@ func (s *SnapshotStore) LoadChunk(height uint64, format uint32, chunk uint32) ([
type offerSnapshot struct {
snapshot *abci.Snapshot
appHash tmbytes.HexBytes
chunks [][]byte
chunkCnt int
chunks *ds.OrderedMap[string, []byte]
}

func newOfferSnapshot(snapshot *abci.Snapshot, appHash tmbytes.HexBytes) *offerSnapshot {
return &offerSnapshot{
snapshot: snapshot,
appHash: appHash,
chunks: make([][]byte, snapshot.Chunks),
chunkCnt: 0,
chunks: ds.NewOrderedMap[string, []byte](),
}
}

func (s *offerSnapshot) addChunk(index int, chunk []byte) {
if s.chunks[index] != nil {
return
func (s *offerSnapshot) addChunk(chunkID tmbytes.HexBytes, data []byte) [][]byte {
chunkIDStr := chunkID.String()
if s.chunks.Has(chunkIDStr) {
return nil
}
s.chunks[index] = chunk
s.chunkCnt++
var item chunkItem
err := json.Unmarshal(data, &item)
if err != nil {
panic("failed to decode a chunk data: " + err.Error())
}
s.chunks.Put(chunkIDStr, item.Data)
return item.NextChunkIDs
}

func (s *offerSnapshot) isFull() bool {
return s.chunkCnt == int(s.snapshot.Chunks)
return bytes.Equal(crypto.Checksum(s.bytes()), s.snapshot.Hash)
}

func (s *offerSnapshot) bytes() []byte {
chunks := s.chunks.Values()
buf := bytes.NewBuffer(nil)
for _, chunk := range s.chunks {
for _, chunk := range chunks {
buf.Write(chunk)
}
return buf.Bytes()
}

// byteChunk returns the chunk at a given index from the full byte slice.
func byteChunk(bz []byte, index uint32) []byte {
start := int(index * snapshotChunkSize)
end := int((index + 1) * snapshotChunkSize)
switch {
case start >= len(bz):
return nil
case end >= len(bz):
return bz[start:]
default:
return bz[start:end]
// makeChunkItem returns the chunk at a given index from the full byte slice.
func makeChunkItem(chunks *ds.OrderedMap[string, []byte], chunkID []byte) chunkItem {
chunkIDStr := hex.EncodeToString(chunkID)
val, ok := chunks.Get(chunkIDStr)
if !ok {
panic("chunk not found")
}
chunkIDs := chunks.Keys()
ci := chunkItem{Data: val}
i := 0
for ; i < len(chunkIDs) && chunkIDs[i] != chunkIDStr; i++ {
}
if i+1 < len(chunkIDs) {
data, err := hex.DecodeString(chunkIDs[i+1])
if err != nil {
panic(err)
}
ci.NextChunkIDs = [][]byte{data}
}
return ci
}

// byteChunks calculates the number of chunks in the byte slice.
func byteChunks(bz []byte) uint32 {
return uint32(math.Ceil(float64(len(bz)) / snapshotChunkSize))
func makeChunks(bz []byte, chunkSize int) *ds.OrderedMap[string, []byte] {
chunks := ds.NewOrderedMap[string, []byte]()
totalHash := hex.EncodeToString(crypto.Checksum(bz))
key := totalHash
for i := 0; i < len(bz); i += chunkSize {
j := i + chunkSize
if j > len(bz) {
j = len(bz)
}
if i > 1 {
key = hex.EncodeToString(crypto.Checksum(bz[i:j]))
}
chunks.Put(key, append([]byte(nil), bz[i:j]...))
}
return chunks
}
37 changes: 37 additions & 0 deletions abci/example/kvstore/snapshots_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kvstore

import (
"encoding/hex"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func TestChunkItem(t *testing.T) {
const size = 64
chunks := makeChunks(makeBytes(1032), size)
keys := chunks.Keys()
values := chunks.Values()
for i, key := range keys {
chunkID, err := hex.DecodeString(key)
require.NoError(t, err)
item := makeChunkItem(chunks, chunkID)
require.Equal(t, values[i], item.Data)
if i+1 < len(keys) {
nextChunkID, err := hex.DecodeString(keys[i+1])
require.NoError(t, err)
require.Equal(t, [][]byte{nextChunkID}, item.NextChunkIDs)
} else {
require.Nil(t, item.NextChunkIDs)
}
}
}

func makeBytes(size int) []byte {
bz := make([]byte, size)
for i := 0; i < size; i++ {
bz[i] = byte(rand.Int63n(256))
}
return bz
}
Loading