diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index 3068031bebb..177265bfe89 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -17,12 +17,12 @@ package fork_graph import ( - "bytes" "errors" "fmt" "sync" "sync/atomic" + "github.com/golang/snappy" "github.com/spf13/afero" libcommon "github.com/erigontech/erigon-lib/common" @@ -125,8 +125,9 @@ type forkGraphDisk struct { lightClientUpdates sync.Map // period -> lightclientupdate // reusable buffers - sszBuffer bytes.Buffer - sszSnappyBuffer bytes.Buffer + sszBuffer []byte + sszSnappyWriter *snappy.Writer + sszSnappyReader *snappy.Reader rcfg beacon_router_configuration.RouterConfiguration emitter *beaconevents.EventEmitter @@ -161,6 +162,8 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader) f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true) + // preallocate buffer + f.sszBuffer = make([]byte, 0, (anchorState.EncodingSizeSSZ()*3)/2) return f } diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go index 11a8bc001d1..c0a163ccf65 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go @@ -18,7 +18,6 @@ package fork_graph import ( "encoding/binary" - "errors" "fmt" "io" "os" @@ -41,50 +40,42 @@ func getBeaconStateCacheFilename(blockRoot libcommon.Hash) string { func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *state.CachingBeaconState, err error) { var file afero.File file, err = f.fs.Open(getBeaconStateFilename(blockRoot)) - if err != nil { return } defer file.Close() + + if f.sszSnappyReader == nil { + f.sszSnappyReader = snappy.NewReader(file) + } else { + f.sszSnappyReader.Reset(file) + } // Read the version v := []byte{0} - if _, err := file.Read(v); err != nil { + if _, err := f.sszSnappyReader.Read(v); err != nil { return nil, fmt.Errorf("failed to read hard fork version: %w, root: %x", err, blockRoot) } // Read the length lengthBytes := make([]byte, 8) var n int - n, err = io.ReadFull(file, lengthBytes) + n, err = io.ReadFull(f.sszSnappyReader, lengthBytes) if err != nil { return nil, fmt.Errorf("failed to read length: %w, root: %x", err, blockRoot) } if n != 8 { return nil, fmt.Errorf("failed to read length: %d, want 8, root: %x", n, blockRoot) } - // Grow the snappy buffer - f.sszSnappyBuffer.Grow(int(binary.BigEndian.Uint64(lengthBytes))) - // Read the snappy buffer - sszSnappyBuffer := f.sszSnappyBuffer.Bytes() - sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)] - n, err = io.ReadFull(file, sszSnappyBuffer) - if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { - return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot) - } - decLen, err := snappy.DecodedLen(sszSnappyBuffer[:n]) - if err != nil { - return nil, fmt.Errorf("failed to get decoded length: %w, root: %x, len: %d", err, blockRoot, n) - } - // Grow the plain ssz buffer - f.sszBuffer.Grow(decLen) - sszBuffer := f.sszBuffer.Bytes() - sszBuffer, err = snappy.Decode(sszBuffer, sszSnappyBuffer[:n]) + f.sszBuffer = f.sszBuffer[:binary.BigEndian.Uint64(lengthBytes)] + n, err = io.ReadFull(f.sszSnappyReader, f.sszBuffer) if err != nil { - return nil, fmt.Errorf("failed to decode snappy buffer: %w, root: %x, len: %d, decLen: %d", err, blockRoot, n, decLen) + return nil, fmt.Errorf("failed to read snappy buffer: %w, root: %x", err, blockRoot) } + f.sszBuffer = f.sszBuffer[:n] + bs = state.New(f.beaconCfg) - if err = bs.DecodeSSZ(sszBuffer, int(v[0])); err != nil { - return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, decLen, bs) + if err = bs.DecodeSSZ(f.sszBuffer, int(v[0])); err != nil { + return nil, fmt.Errorf("failed to decode beacon state: %w, root: %x, len: %d, decLen: %d, bs: %+v", err, blockRoot, n, len(f.sszBuffer), bs) } // decode the cache file cacheFile, err := f.fs.Open(getBeaconStateCacheFilename(blockRoot)) @@ -106,47 +97,42 @@ func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *stat return } // Truncate and then grow the buffer to the size of the state. - encodingSizeSSZ := bs.EncodingSizeSSZ() - f.sszBuffer.Grow(encodingSizeSSZ) - f.sszBuffer.Reset() - - sszBuffer := f.sszBuffer.Bytes() - sszBuffer, err = bs.EncodeSSZ(sszBuffer) + f.sszBuffer, err = bs.EncodeSSZ(f.sszBuffer[:0]) if err != nil { return } - // Grow the snappy buffer - f.sszSnappyBuffer.Grow(snappy.MaxEncodedLen(len(sszBuffer))) - // Compress the ssz buffer - sszSnappyBuffer := f.sszSnappyBuffer.Bytes() - sszSnappyBuffer = sszSnappyBuffer[:cap(sszSnappyBuffer)] - sszSnappyBuffer = snappy.Encode(sszSnappyBuffer, sszBuffer) + var dumpedFile afero.File dumpedFile, err = f.fs.OpenFile(getBeaconStateFilename(blockRoot), os.O_TRUNC|os.O_CREATE|os.O_RDWR, 0o755) if err != nil { return } defer dumpedFile.Close() + + if f.sszSnappyWriter == nil { + f.sszSnappyWriter = snappy.NewBufferedWriter(dumpedFile) + } else { + f.sszSnappyWriter.Reset(dumpedFile) + } + // First write the hard fork version - _, err = dumpedFile.Write([]byte{byte(bs.Version())}) - if err != nil { - return + if _, err := f.sszSnappyWriter.Write([]byte{byte(bs.Version())}); err != nil { + return err } // Second write the length length := make([]byte, 8) - binary.BigEndian.PutUint64(length, uint64(len(sszSnappyBuffer))) - _, err = dumpedFile.Write(length) - if err != nil { - return + binary.BigEndian.PutUint64(length, uint64(len(f.sszBuffer))) + if _, err := f.sszSnappyWriter.Write(length); err != nil { + return err } // Lastly dump the state - _, err = dumpedFile.Write(sszSnappyBuffer) - if err != nil { + if _, err := f.sszSnappyWriter.Write(f.sszBuffer); err != nil { + return err + } + if err = f.sszSnappyWriter.Flush(); err != nil { return } - - err = dumpedFile.Sync() - if err != nil { + if err = dumpedFile.Sync(); err != nil { return }