Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 committed Nov 14, 2024
1 parent c7f4ff3 commit 7651fd1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 51 deletions.
9 changes: 6 additions & 3 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package fork_graph

import (
"bytes"
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/golang/snappy"
"github.com/spf13/afero"

libcommon "github.com/erigontech/erigon-lib/common"
Expand Down Expand Up @@ -125,8 +125,9 @@ type forkGraphDisk struct {
lightClientUpdates sync.Map // period -> lightclientupdate

// reusable buffers
sszBuffer bytes.Buffer
sszSnappyBuffer bytes.Buffer
sszBuffer []byte
sszSnappyWriter *snappy.Writer
sszSnappyReader *snappy.Reader

rcfg beacon_router_configuration.RouterConfiguration
emitter *beaconevents.EventEmitter
Expand Down Expand Up @@ -161,6 +162,8 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r
f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader)

f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true)
// preallocate buffer
f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2)
return f
}

Expand Down
82 changes: 34 additions & 48 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package fork_graph

import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
Expand All @@ -41,50 +40,42 @@ func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string {
func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) {
var file afero.File
file, err = f.fs.Open(getBeaconStateFilename(blockRoot))

if err != nil {
return
}
defer file.Close()

if f.sszSnappyReader == nil {
f.sszSnappyReader = snappy.NewReader(file)
} else {
f.sszSnappyReader.Reset(file)
}
// Read the version
v := []byte{0}
if _, err := file.Read(v); err != nil {
if _, err := f.sszSnappyReader.Read(v); err != nil {
return nil, fmt.Errorf("failed to read hard fork version: %w, root: %x", err, blockRoot)
}
// Read the length
lengthBytes := make([]byte, 8)
var n int
n, err = io.ReadFull(file, lengthBytes)
n, err = io.ReadFull(f.sszSnappyReader, lengthBytes)
if err != nil {
return nil, fmt.Errorf("failed to read length: %w, root: %x", err, blockRoot)
}
if n != 8 {
return nil, fmt.Errorf("failed to read length: %d, want 8, root: %x", n, blockRoot)
}
// Grow the snappy buffer
f.sszSnappyBuffer.Grow(int(binary.BigEndian.Uint64(lengthBytes)))
// Read the snappy buffer
sszSnappyBuffer := f.sszSnappyBuffer.Bytes()
sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)]
n, err = io.ReadFull(file, sszSnappyBuffer)
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot)
}

decLen, err := snappy.DecodedLen(sszSnappyBuffer[:n])
if err != nil {
return nil, fmt.Errorf("failed to get decoded length: %w, root: %x, len: %d", err, blockRoot, n)
}
// Grow the plain ssz buffer
f.sszBuffer.Grow(decLen)
sszBuffer := f.sszBuffer.Bytes()
sszBuffer, err = snappy.Decode(sszBuffer, sszSnappyBuffer[:n])
f.sszBuffer = f.sszBuffer[:binary.BigEndian.Uint64(lengthBytes)]
n, err = io.ReadFull(f.sszSnappyReader, f.sszBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decode snappy buffer: %w, root: %x, len: %d, decLen: %d", err, blockRoot, n, decLen)
return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot)
}
f.sszBuffer = f.sszBuffer[:n]

bs = state.New(f.beaconCfg)
if err = bs.DecodeSSZ(sszBuffer, int(v[0])); err != nil {
return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, decLen, bs)
if err = bs.DecodeSSZ(f.sszBuffer, int(v[0])); err != nil {
return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, len(f.sszBuffer), bs)
}
// decode the cache file
cacheFile, err := f.fs.Open(getBeaconStateCacheFilename(blockRoot))
Expand All @@ -106,47 +97,42 @@ func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *stat
return
}
// Truncate and then grow the buffer to the size of the state.
encodingSizeSSZ := bs.EncodingSizeSSZ()
f.sszBuffer.Grow(encodingSizeSSZ)
f.sszBuffer.Reset()

sszBuffer := f.sszBuffer.Bytes()
sszBuffer, err = bs.EncodeSSZ(sszBuffer)
f.sszBuffer, err = bs.EncodeSSZ(f.sszBuffer[:0])
if err != nil {
return
}
// Grow the snappy buffer
f.sszSnappyBuffer.Grow(snappy.MaxEncodedLen(len(sszBuffer)))
// Compress the ssz buffer
sszSnappyBuffer := f.sszSnappyBuffer.Bytes()
sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)]
sszSnappyBuffer = snappy.Encode(sszSnappyBuffer, sszBuffer)

var dumpedFile afero.File
dumpedFile, err = f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755)
if err != nil {
return
}
defer dumpedFile.Close()

if f.sszSnappyWriter == nil {
f.sszSnappyWriter = snappy.NewBufferedWriter(dumpedFile)
} else {
f.sszSnappyWriter.Reset(dumpedFile)
}

// First write the hard fork version
_, err = dumpedFile.Write([]byte{byte(bs.Version())})
if err != nil {
return
if _, err := f.sszSnappyWriter.Write([]byte{byte(bs.Version())}); err != nil {
return err
}
// Second write the length
length := make([]byte, 8)
binary.BigEndian.PutUint64(length, uint64(len(sszSnappyBuffer)))
_, err = dumpedFile.Write(length)
if err != nil {
return
binary.BigEndian.PutUint64(length, uint64(len(f.sszBuffer)))
if _, err := f.sszSnappyWriter.Write(length); err != nil {
return err
}
// Lastly dump the state
_, err = dumpedFile.Write(sszSnappyBuffer)
if err != nil {
if _, err := f.sszSnappyWriter.Write(f.sszBuffer); err != nil {
return err
}
if err = f.sszSnappyWriter.Flush(); err != nil {
return
}

err = dumpedFile.Sync()
if err != nil {
if err = dumpedFile.Sync(); err != nil {
return
}

Expand Down

0 comments on commit 7651fd1

Please sign in to comment.