From be08ccd16afb45d3a61202152288b55de9862712 Mon Sep 17 00:00:00 2001 From: Huy Ngo Date: Tue, 17 Sep 2024 17:38:19 +0700 Subject: [PATCH] rawdb,ethdb,eth: implement freezer tail deletion and use atomic reference commit 538a868 fix up --- core/blockchain.go | 6 +- core/rawdb/database.go | 13 +- core/rawdb/freezer.go | 72 ++++++-- core/rawdb/freezer_batch.go | 1 + core/rawdb/freezer_meta.go | 112 ++++++++++++ core/rawdb/freezer_meta_test.go | 61 +++++++ core/rawdb/freezer_table.go | 287 ++++++++++++++++++++++++------- core/rawdb/freezer_table_test.go | 4 +- core/rawdb/freezer_test.go | 6 +- core/rawdb/freezer_utils.go | 125 ++++++++++++++ core/rawdb/freezer_utils_test.go | 75 ++++++++ core/rawdb/table.go | 11 +- core/state/pruner/pruner.go | 6 +- ethdb/database.go | 26 ++- 14 files changed, 700 insertions(+), 105 deletions(-) create mode 100644 core/rawdb/freezer_meta.go create mode 100644 core/rawdb/freezer_meta_test.go create mode 100644 core/rawdb/freezer_utils.go create mode 100644 core/rawdb/freezer_utils_test.go diff --git a/core/blockchain.go b/core/blockchain.go index f93c9fc5f7..7a151ca0b3 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -748,7 +748,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo if num+1 <= frozen { // Truncate all relative data(header, total difficulty, body, receipt // and canonical hash) from ancient store. - if err := bc.db.TruncateAncients(num); err != nil { + if err := bc.db.TruncateHead(num); err != nil { log.Crit("Failed to truncate ancient data", "number", num, "err", err) } // Remove the hash <-> number mapping from the active store. @@ -1185,7 +1185,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ // The tx index data could not be written. // Roll back the ancient store update. fastBlock := bc.CurrentFastBlock().NumberU64() - if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { + if err := bc.db.TruncateHead(fastBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, err @@ -1201,7 +1201,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if !updateHead(blockChain[len(blockChain)-1]) { // We end up here if the header chain has reorg'ed, and the blocks/receipts // don't match the canonical chain. - if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil { + if err := bc.db.TruncateHead(previousFastBlock + 1); err != nil { log.Error("Can't truncate ancient store after failed insert", "err", err) } return 0, errSideChainReceipts diff --git a/core/rawdb/database.go b/core/rawdb/database.go index e2e06cff8b..6a08932208 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -123,13 +123,18 @@ func (db *nofreezedb) ModifyAncients(func(ethdb.AncientWriteOp) error) (int64, e return 0, errNotSupported } -// TruncateAncients returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) TruncateAncients(items uint64) error { +// Sync returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) Sync() error { return errNotSupported } -// Sync returns an error as we don't have a backing chain freezer. -func (db *nofreezedb) Sync() error { +// TruncateHead returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateHead(items uint64) error { + return errNotSupported +} + +// TruncateTail returns an error as we don't have a backing chain freezer. +func (db *nofreezedb) TruncateTail(items uint64) error { return errNotSupported } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 538a799b5c..1a96aff6d7 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -52,11 +52,11 @@ var ( const ( - // freezerTableSize defines the maximum size of freezer data files. + // freezerTableSize defines the maximum size of freezer data files, max size of per file is 2GB. freezerTableSize = 2 * 1000 * 1000 * 1000 ) -// freezer is an memory mapped append-only database to store immutable chain data +// freezer is a memory mapped append-only database to store immutable chain data // into flat files: // // - The append only nature ensures that disk writes are minimized. @@ -65,6 +65,7 @@ const ( // of Geth, and thus also GC overhead. type Freezer struct { frozen atomic.Uint64 // Number of items already frozen + tail atomic.Uint64 // Number of the first stored item in the freezer threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests) // This lock synchronizes writers and the truncate operation, as well as @@ -116,6 +117,8 @@ func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize ui trigger: make(chan chan struct{}), quit: make(chan struct{}), } + // The number of blocks after which a chain segment is + // considered immutable (i.e. soft finality) freezer.threshold.Store(params.FullImmutabilityThreshold) // Create the tables. @@ -131,7 +134,7 @@ func NewFreezer(datadir string, namespace string, readonly bool, maxTableSize ui freezer.tables[name] = table } - // Truncate all tables to common length. + // Truncate all tables to common length, then close if err := freezer.repair(); err != nil { for _, table := range freezer.tables { table.Close() @@ -219,10 +222,9 @@ func (f *Freezer) AncientSize(kind string) (uint64, error) { return 0, errUnknownTable } -// Tail returns an error as we don't have a backing chain freezer. +// Tail returns the number of first stored item in the freezer. func (f *Freezer) Tail() (uint64, error) { - // return f.tail.Load(), nil, in the next implementing, right now just keep it zero - return 0, nil + return f.tail.Load(), nil } // ReadAncients runs the given read operation while ensuring that no writes take place @@ -247,7 +249,7 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize if err != nil { // The write operation has failed. Go back to the previous item position. for name, table := range f.tables { - err := table.truncate(prevItem) + err := table.truncateHead(prevItem) if err != nil { log.Error("Freezer table roll-back failed", "table", name, "index", prevItem, "err", err) } @@ -267,19 +269,20 @@ func (f *Freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize return writeSize, nil } -// TruncateAncients discards any recent data above the provided threshold number. -func (f *Freezer) TruncateAncients(items uint64) error { +// TruncateHead discards any recent data above the provided threshold number, only keep the first items ancient data. +func (f *Freezer) TruncateHead(items uint64) error { if f.readonly { return errReadOnly } f.writeLock.Lock() defer f.writeLock.Unlock() + // If the current frozen number is less than the requested items for frozen, do nothing. if f.frozen.Load() <= items { return nil } for _, table := range f.tables { - if err := table.truncate(items); err != nil { + if err := table.truncateHead(items); err != nil { return err } } @@ -287,6 +290,28 @@ func (f *Freezer) TruncateAncients(items uint64) error { return nil } +// TruncateTail discards any recent data below the provided threshold number, only keep the last items ancient data. +func (f *Freezer) TruncateTail(tail uint64) error { + if f.readonly { + return errReadOnly + } + f.writeLock.Lock() + defer f.writeLock.Unlock() + + // If the current tail number is greater than the requested tail, seem out of range for truncating, do nothing. + if f.tail.Load() >= tail { + return nil + } + + for _, table := range f.tables { + if err := table.truncateTail(tail); err != nil { + return err + } + } + f.tail.Store(tail) + return nil +} + // Sync flushes all data tables to disk. func (f *Freezer) Sync() error { var errs []error @@ -303,18 +328,35 @@ func (f *Freezer) Sync() error { // repair truncates all data tables to the same length. func (f *Freezer) repair() error { - min := uint64(math.MaxUint64) + var ( + head = uint64(math.MaxUint64) + tail = uint64(0) + ) + // Looping through all tables to find the most common head and tail between tables for _, table := range f.tables { items := table.items.Load() - if min > items { - min = items + + if head > items { + head = items + } + hidden := table.itemHidden.Load() + if hidden > tail { + tail = hidden } } + + // Truncate all tables to the common head and tail. for _, table := range f.tables { - if err := table.truncate(min); err != nil { + if err := table.truncateHead(head); err != nil { + return err + } + + if err := table.truncateTail(tail); err != nil { return err } } - f.frozen.Store(min) + // Update frozen and tail counters. + f.frozen.Store(head) + f.tail.Store(tail) return nil } diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index dfb16a58e1..e143dba2d4 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -165,6 +165,7 @@ func (batch *freezerTableBatch) appendItem(data []byte) error { batch.totalBytes += itemSize // Put index entry to buffer. + // The index file contains a list of index entries. entry := indexEntry{filenum: batch.t.headId, offset: uint32(itemOffset + itemSize)} batch.indexBuffer = entry.append(batch.indexBuffer) batch.curItem++ diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go new file mode 100644 index 0000000000..3eed366a7b --- /dev/null +++ b/core/rawdb/freezer_meta.go @@ -0,0 +1,112 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package rawdb + +import ( + "io" + "os" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +const freezerVersion = 1 // The initial version tag of freezer table metadata + +// freezerTableMeta wraps all the metadata of the freezer table. +type freezerTableMeta struct { + // Version is the versioning descriptor of the freezer table. + Version uint16 + + // VirtualTail indicates how many items have been marked as deleted. + // Its value is equal to the number of items removed from the table + // plus the number of items hidden in the table, so it should never + // be lower than the "actual tail". + VirtualTail uint64 +} + +// newMetadata initializes the metadata object with the given virtual tail. +func newMetadata(tail uint64) *freezerTableMeta { + return &freezerTableMeta{ + Version: freezerVersion, + VirtualTail: tail, + } +} + +// readMetadata reads the metadata of the freezer table from the +// given metadata file. +func readMetadata(file *os.File) (*freezerTableMeta, error) { + _, err := file.Seek(0, io.SeekStart) // SeekStart means the origin of the file + if err != nil { + return nil, err + } + var meta freezerTableMeta + if err := rlp.Decode(file, &meta); err != nil { + return nil, err + } + return &meta, nil +} + +// writeMetadata writes the metadata of the freezer table into the +// given metadata file. +func writeMetadata(file *os.File, meta *freezerTableMeta) error { + _, err := file.Seek(0, io.SeekStart) + if err != nil { + return err + } + return rlp.Encode(file, meta) +} + +// loadMetadata loads the metadata from the given metadata file. +// Initializes the metadata file with the given "actual tail" if +// it's empty. +func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) { + stat, err := file.Stat() + if err != nil { + return nil, err + } + + // Write the metadata with the given actual tail into metadata file + // if it's non-existent. There are two possible scenarios here: + // - the freezer table is empty + // - the freezer table is legacy + // In both cases, write the meta into the file with the actual tail + // as the virtual tail. + if stat.Size() == 0 { // The file is empty + m := newMetadata(tail) + if err := writeMetadata(file, m); err != nil { + return nil, err + } + return m, nil + } + + // If the file is not empty, read the metadata from the file. + m, err := readMetadata(file) + if err != nil { + return nil, err + } + // Update the virtual tail with the given actual tail if it's even + // lower than it. Theoretically it shouldn't happen at all, print + // a warning here. + if m.VirtualTail < tail { + log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail) + m.VirtualTail = tail + if err := writeMetadata(file, m); err != nil { + return nil, err + } + } + return m, nil +} diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go new file mode 100644 index 0000000000..191744a754 --- /dev/null +++ b/core/rawdb/freezer_meta_test.go @@ -0,0 +1,61 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see + +package rawdb + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestReadWriteFreezerTableMeta(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + err = writeMetadata(f, newMetadata(100)) + if err != nil { + t.Fatalf("Failed to write metadata %v", err) + } + meta, err := readMetadata(f) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.Version != freezerVersion { + t.Fatalf("Unexpected version field") + } + if meta.VirtualTail != uint64(100) { + t.Fatalf("Unexpected virtual tail field") + } +} + +func TestInitializeFreezerTableMeta(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + meta, err := loadMetadata(f, uint64(100)) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.Version != freezerVersion { + t.Fatalf("Unexpected version field") + } + if meta.VirtualTail != uint64(100) { + t.Fatalf("Unexpected virtual tail field") + } +} diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index d15d443943..81e8a3155f 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -50,17 +50,16 @@ var ( // offset within the file to the end of the data // In serialized form, the filenum is stored as uint16. type indexEntry struct { - filenum uint32 // stored as uint16 ( 2 bytes) - offset uint32 // stored as uint32 ( 4 bytes) + filenum uint32 // stored as uint16 ( 2 bytes ) + offset uint32 // stored as uint32 ( 4 bytes ) } -const indexEntrySize = 6 +const indexEntrySize = 6 // filenum + offset // unmarshalBinary deserializes binary b into the rawIndex entry. -func (i *indexEntry) unmarshalBinary(b []byte) error { +func (i *indexEntry) unmarshalBinary(b []byte) { i.filenum = uint32(binary.BigEndian.Uint16(b[:2])) i.offset = binary.BigEndian.Uint32(b[2:6]) - return nil } // append adds the encoded entry to the end of b. @@ -92,16 +91,24 @@ type freezerTable struct { items atomic.Uint64 // Number of items stored in the table (including items removed from tail) itemOffset atomic.Uint64 // Number of items removed from the table + // itemHidden is the number of items marked as deleted. Tail deletion is + // only supported at file level which means the actual deletion will be + // delayed until the entire data file is marked as deleted. Before that + // these items will be hidden to prevent being visited again. The value + // should never be lower than itemOffset. + itemHidden atomic.Uint64 + noCompression bool // if true, disables snappy compression. Note: does not work retroactively maxFileSize uint32 // Max file size for data-files name string path string head *os.File // File descriptor for the data head of the table + index *os.File // File descriptor for the indexEntry file of the table + meta *os.File // File descriptor for the metadata file of the table files map[uint32]*os.File // open files headId uint32 // number of the currently active head file tailId uint32 // number of the earliest file - index *os.File // File descriptor for the indexEntry file of the table headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read @@ -117,47 +124,9 @@ func newFreezerTable(path, name string, disableSnappy bool) (*freezerTable, erro return newTable(path, name, metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, freezerTableSize, disableSnappy) } -// openFreezerFileForAppend opens a freezer table file and seeks to the end -func openFreezerFileForAppend(filename string) (*os.File, error) { - // Open the file without the O_APPEND flag - // because it has differing behaviour during Truncate operations - // on different OS's - file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - // Seek to end for append - if _, err = file.Seek(0, io.SeekEnd); err != nil { - return nil, err - } - return file, nil -} - -// openFreezerFileForReadOnly opens a freezer table file for read only access -func openFreezerFileForReadOnly(filename string) (*os.File, error) { - return os.OpenFile(filename, os.O_RDONLY, 0644) -} - -// openFreezerFileTruncated opens a freezer table making sure it is truncated -func openFreezerFileTruncated(filename string) (*os.File, error) { - return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) -} - -// truncateFreezerFile resizes a freezer table file and seeks to the end -func truncateFreezerFile(file *os.File, size int64) error { - if err := file.Truncate(size); err != nil { - return err - } - // Seek to end for append - if _, err := file.Seek(0, io.SeekEnd); err != nil { - return err - } - return nil -} - // newTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure -// they don't go out of sync. +// they don't go out of sync. (Table name could be bodies, receipts, etc.) func newTable(path string, name string, readMeter metrics.Meter, writeMeter metrics.Meter, sizeGauge metrics.Gauge, maxFilesize uint32, noCompression bool) (*freezerTable, error) { // Ensure the containing directory exists and open the indexEntry file if err := os.MkdirAll(path, 0755); err != nil { @@ -171,13 +140,24 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr // Compressed idx idxName = fmt.Sprintf("%s.cidx", name) } - offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName)) + var ( + err error + index *os.File + meta *os.File + ) + index, err = openFreezerFileForAppend(filepath.Join(path, idxName)) + if err != nil { + return nil, err + } + meta, err = openFreezerFileForAppend(filepath.Join(path, fmt.Sprintf("%s.meta", name))) if err != nil { return nil, err } + // Create the table and repair any past inconsistency tab := &freezerTable{ - index: offsets, + index: index, + meta: meta, files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, @@ -244,8 +224,20 @@ func (t *freezerTable) repair() error { t.tailId = firstIndex.filenum t.itemOffset.Store(uint64(firstIndex.offset)) - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) - lastIndex.unmarshalBinary(buffer) + // Load metadata from the file + meta, err := loadMetadata(t.meta, t.itemOffset.Load()) + if err != nil { + return err + } + t.itemHidden.Store(meta.VirtualTail) + + // Read the last index, use the default value in case the freezer is empty + if offsetsSize == indexEntrySize { + lastIndex = indexEntry{filenum: t.tailId, offset: 0} + } else { + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + lastIndex.unmarshalBinary(buffer) + } t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) if err != nil { return err @@ -274,9 +266,15 @@ func (t *freezerTable) repair() error { return err } offsetsSize -= indexEntrySize - t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + // Read the new head index, use the default value in case + // the freezer is already empty. var newLastIndex indexEntry - newLastIndex.unmarshalBinary(buffer) + if offsetsSize == indexEntrySize { + newLastIndex = indexEntry{filenum: t.tailId, offset: 0} + } else { + t.index.ReadAt(buffer, offsetsSize-indexEntrySize) + newLastIndex.unmarshalBinary(buffer) + } // We might have slipped back into an earlier head-file here if newLastIndex.filenum != lastIndex.filenum { // Release earlier opened file @@ -302,11 +300,19 @@ func (t *freezerTable) repair() error { if err := t.head.Sync(); err != nil { return err } + if err := t.meta.Sync(); err != nil { + return err + } // Update the item and byte counters and return t.items.Store(t.itemOffset.Load() + uint64(offsetsSize/indexEntrySize-1)) // last indexEntry points to the end of the data file t.headBytes = contentSize t.headId = lastIndex.filenum + // Delete the leftover files because of head deletion + t.releaseFilesAfter(t.headId, true) + + // Delete the leftover files because of tail deletion + t.releaseFilesBefore(t.tailId, true) // Close opened files and preopen all files if err := t.preopen(); err != nil { return err @@ -333,16 +339,20 @@ func (t *freezerTable) preopen() (err error) { return err } -// truncate discards any recent data above the provided threshold number. -func (t *freezerTable) truncate(items uint64) error { +// truncateHead discards any recent data above the provided threshold number. +func (t *freezerTable) truncateHead(items uint64) error { t.lock.Lock() defer t.lock.Unlock() - // If our item count is correct, don't do anything + // Ensure the given truncate target must be within the existing range. existing := t.items.Load() if existing <= items { return nil } + // Ensure the given truncate target must be above the hidden items. + if items < t.itemHidden.Load() { + return errors.New("truncation below tail") + } // We need to truncate, save the old size for metrics tracking oldSize, err := t.sizeNolock() if err != nil { @@ -354,17 +364,25 @@ func (t *freezerTable) truncate(items uint64) error { log = t.logger.Warn // Only loud warn if we delete multiple items } log("Truncating freezer table", "items", existing, "limit", items) - if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { - return err - } + + // Truncate the index file first, the tail position is also considered + // when calculating the new freezer table length. // Calculate the new expected size of the data file and truncate it - buffer := make([]byte, indexEntrySize) - if _, err := t.index.ReadAt(buffer, int64(items*indexEntrySize)); err != nil { + length := items - t.itemOffset.Load() + if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { return err } var expected indexEntry - expected.unmarshalBinary(buffer) + if length == 0 { + expected = indexEntry{filenum: t.tailId, offset: 0} + } else { + buffer := make([]byte, indexEntrySize) + if _, err := t.index.ReadAt(buffer, int64(length*indexEntrySize)); err != nil { + return err + } + expected.unmarshalBinary(buffer) + } // We might need to truncate back to older files if expected.filenum != t.headId { // If already open for reading, force-reopen for writing @@ -397,6 +415,117 @@ func (t *freezerTable) truncate(items uint64) error { return nil } +// truncateTail discards any recent data before the provided threshold number. +// tail -> item-offset -> item-hidden -> truncated-items -> items/head. (Valid Range). +func (t *freezerTable) truncateTail(items uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + + // The truncateTarget is below the current tail, return nil, no need to truncate + if t.itemHidden.Load() >= items { + return nil + } + // The truncateTarget is above the current head, return error + if t.items.Load() < items { + return errors.New("truncation above head") + } + + // Load the new tail index by the given new tail position + var ( + newTailId uint32 + buffer = make([]byte, indexEntrySize) + ) + + if t.items.Load() == items { + newTailId = t.headId // Truncate in the head. + } else { + // Get the index entry of the new tail position and it's it based on the file number. + offset := items - t.itemOffset.Load() + if _, err := t.index.ReadAt(buffer, int64((offset+1)*indexEntrySize)); err != nil { + return err + } + var newTailIndex indexEntry + newTailIndex.unmarshalBinary(buffer) + newTailId = newTailIndex.filenum + } + // Update the virtual tail marker and hidden these entries in table. + t.itemHidden.Store(items) + if err := writeMetadata(t.meta, newMetadata(items)); err != nil { + return err + } + // Hidden items still fall in the current tail file, no data file + // can be dropped. + if t.tailId == newTailId { + return nil + } + // Hidden items fall in the incorrect range, returns the error. + if t.tailId > newTailId { + return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId) + } + // Hidden items exceed the current tail file, drop the relevant + // data files. We need to truncate, save the old size for metrics + // tracking. + oldSize, err := t.sizeNolock() + if err != nil { + return err + } + // Count how many items can be deleted from the file. + var ( + newDeleted = items + deleted = t.itemOffset.Load() + ) + for current := items - 1; current >= deleted; current -= 1 { + if _, err := t.index.ReadAt(buffer, int64((current-deleted+1)*indexEntrySize)); err != nil { + return err + } + var pre indexEntry + pre.unmarshalBinary(buffer) + if pre.filenum != newTailId { + break + } + newDeleted = current + } + // Commit the changes of metadata file first before manipulating + // the indexes file. + if err := t.meta.Sync(); err != nil { + return err + } + // Truncate the deleted index entries from the index file. It overwrites the entries in current index file. + err = copyFrom(t.index.Name(), t.index.Name(), indexEntrySize*(newDeleted-deleted+1), func(f *os.File) error { + tailIndex := indexEntry{ + filenum: newTailId, + offset: uint32(newDeleted), + } + _, err := f.Write(tailIndex.append(nil)) + return err + }) + if err != nil { + return err + } + // Reopen the modified index file to load the changes + if err := t.index.Close(); err != nil { + return err + } + t.index, err = openFreezerFileForAppend(t.index.Name()) + if err != nil { + return err + } + // Release/Delete any files before the current tail + t.tailId = newTailId + t.itemOffset.Store(newDeleted) + + // Release with removing any files before the current tailId + t.releaseFilesBefore(t.tailId, true) + + // Retrieve the new size and update the total size counter + newSize, err := t.sizeNolock() + if err != nil { + return err + } + t.sizeGauge.Dec(int64(oldSize - newSize)) + return nil +} + // Close closes all opened files. func (t *freezerTable) Close() error { t.lock.Lock() @@ -408,6 +537,11 @@ func (t *freezerTable) Close() error { } t.index = nil + if err := t.meta.Close(); err != nil { + errs = append(errs, err) + } + t.meta = nil + for _, f := range t.files { if err := f.Close(); err != nil { errs = append(errs, err) @@ -421,6 +555,19 @@ func (t *freezerTable) Close() error { return nil } +// releaseFilesBefore closes all open files with a lower number, and optionally also deletes the files +func (t *freezerTable) releaseFilesBefore(num uint32, remove bool) { + for fnum, f := range t.files { + if fnum < num { + delete(t.files, fnum) + f.Close() + if remove { + os.Remove(f.Name()) + } + } + } +} + // openFile assumes that the write-lock is held by the caller func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) { var exist bool @@ -559,14 +706,15 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i if t.index == nil || t.head == nil { return nil, nil, errClosed } - itemCount := t.items.Load() // max number + items := t.items.Load() // max number + hidden := t.itemHidden.Load() // Ensure the start is written, not deleted from the tail, and that the // caller actually wants something - if itemCount <= start || t.itemOffset.Load() > start || count == 0 { + if items <= start || hidden > start || count == 0 { return nil, nil, errOutOfBounds } - if start+count > itemCount { - count = itemCount - start + if start+count > items { + count = items - start } var ( output = make([]byte, maxBytes) // Buffer to read data into @@ -645,7 +793,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i // has returns an indicator whether the specified number data // exists in the freezer table. func (t *freezerTable) has(number uint64) bool { - return t.items.Load() > number + return t.items.Load() > number && t.itemHidden.Load() <= number } // size returns the total data size in the freezer table. @@ -716,13 +864,20 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string { } func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { + meta, err := readMetadata(t.meta) + if err != nil { + fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) + return + } + fmt.Fprintf(w, "Version %d deleted %d, hidden %d\n", meta.Version, t.itemOffset.Load(), t.itemHidden.Load()) + buf := make([]byte, indexEntrySize) fmt.Fprintf(w, "| number | fileno | offset |\n") fmt.Fprintf(w, "|--------|--------|--------|\n") for i := uint64(start); ; i++ { - if _, err := t.index.ReadAt(buf, int64(i*indexEntrySize)); err != nil { + if _, err := t.index.ReadAt(buf, int64((i+1)*indexEntrySize)); err != nil { break } var entry indexEntry diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 57ddde49f7..edbfa15687 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -387,7 +387,7 @@ func TestFreezerTruncate(t *testing.T) { t.Fatal(err) } defer f.Close() - f.truncate(10) // 150 bytes + f.truncateHead(10) // 150 bytes if f.items.Load() != 10 { t.Fatalf("expected %d items, got %d", 10, f.items.Load()) } @@ -504,7 +504,7 @@ func TestFreezerReadAndTruncate(t *testing.T) { } // Now, truncate back to zero - f.truncate(0) + f.truncateHead(0) // Write the data again batch := f.newBatch() diff --git a/core/rawdb/freezer_test.go b/core/rawdb/freezer_test.go index 474650e00d..418e4ae5b1 100644 --- a/core/rawdb/freezer_test.go +++ b/core/rawdb/freezer_test.go @@ -186,7 +186,7 @@ func TestFreezerConcurrentModifyRetrieve(t *testing.T) { wg.Wait() } -// This test runs ModifyAncients and TruncateAncients concurrently with each other. +// This test runs ModifyAncients and TruncateHead concurrently with each other. func TestFreezerConcurrentModifyTruncate(t *testing.T) { f, dir := newFreezerForTesting(t, freezerTestTableDef) defer os.RemoveAll(dir) @@ -196,7 +196,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { for i := 0; i < 1000; i++ { // First reset and write 100 items. - if err := f.TruncateAncients(0); err != nil { + if err := f.TruncateHead(0); err != nil { t.Fatal("truncate failed:", err) } _, err := f.ModifyAncients(func(op ethdb.AncientWriteOp) error { @@ -231,7 +231,7 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) { wg.Done() }() go func() { - truncateErr = f.TruncateAncients(10) + truncateErr = f.TruncateHead(10) wg.Done() }() go func() { diff --git a/core/rawdb/freezer_utils.go b/core/rawdb/freezer_utils.go new file mode 100644 index 0000000000..4354f94986 --- /dev/null +++ b/core/rawdb/freezer_utils.go @@ -0,0 +1,125 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "io" + "io/ioutil" + "os" + "path/filepath" +) + +// copyFrom copies data from 'srcPath' at offset 'offset' into 'destPath'. +// The 'destPath' is created if it doesn't exist, otherwise it is overwritten. +// Before the copy is executed, there is a callback can be registered to +// manipulate the dest file. +// It is perfectly valid to have destPath == srcPath. +// Those paths must be absolute path. +func copyFrom(srcPath, destPath string, offset uint64, beforeCopyFunc func(f *os.File) error) error { + // Create a temp file in the same directory where we want it to wind up + f, err := ioutil.TempFile(filepath.Dir(destPath), "*") // Create random name + if err != nil { + return err + } + + fname := f.Name() + + // Clean up the remaining file. + defer func() { + if f != nil { + f.Close() + } + os.Remove(fname) // Clean up the temp file + }() + + // Apply the beforeCopyFun , before we processing + if beforeCopyFunc != nil { + if err := beforeCopyFunc(f); err != nil { + return err + } + } + // Open the source file + + src, err := os.Open(srcPath) + if err != nil { + return err + } + // Set offset of nextRead in offset relative to origin of the file. + if _, err = src.Seek(int64(offset), 0); err != nil { + src.Close() + return err + } + + // io.Copy uses 32K buffer internally. + _, err = io.Copy(f, src) + if err != nil { + src.Close() + return err + } + // Rename the temporary file to the specified dest name. + // src may be same as dest, so needs to be closed before + // we do the final move. + src.Close() + + if err := f.Close(); err != nil { + return err + } + f = nil + + if err := os.Rename(fname, destPath); err != nil { + return err + } + return nil +} + +// openFreezerFileForAppend opens a freezer table file and seeks to the end, if it's not exist, create it. +func openFreezerFileForAppend(filename string) (*os.File, error) { + // Open the file without the O_APPEND flag + // because it has differing behaviour during Truncate operations + // on different OS's + file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + // Seek to end for append + if _, err = file.Seek(0, io.SeekEnd); err != nil { + return nil, err + } + return file, nil +} + +// openFreezerFileForReadOnly opens a freezer table file for read only access +func openFreezerFileForReadOnly(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDONLY, 0644) +} + +// openFreezerFileTruncated opens a freezer table making sure it is truncated +func openFreezerFileTruncated(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +} + +// truncateFreezerFile resizes a freezer table file and seeks to the end +func truncateFreezerFile(file *os.File, size int64) error { + if err := file.Truncate(size); err != nil { + return err + } + // Seek to end for append + if _, err := file.Seek(0, io.SeekEnd); err != nil { + return err + } + return nil +} diff --git a/core/rawdb/freezer_utils_test.go b/core/rawdb/freezer_utils_test.go new file mode 100644 index 0000000000..445f63fb79 --- /dev/null +++ b/core/rawdb/freezer_utils_test.go @@ -0,0 +1,75 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . +package rawdb + +import ( + "bytes" + "io/ioutil" + "os" + "testing" +) + +func TestCopyFrom(t *testing.T) { + var ( + content = []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8} + prefix = []byte{0x9, 0xa, 0xb, 0xc, 0xd, 0xf} + ) + var cases = []struct { + src, dest string + offset uint64 + writePrefix bool + }{ + {"foo", "bar", 0, false}, + {"foo", "bar", 1, false}, + {"foo", "bar", 8, false}, + {"foo", "foo", 0, false}, + {"foo", "foo", 1, false}, + {"foo", "foo", 8, false}, + {"foo", "bar", 0, true}, + {"foo", "bar", 1, true}, + {"foo", "bar", 8, true}, + } + for _, c := range cases { + ioutil.WriteFile(c.src, content, 0644) + + if err := copyFrom(c.src, c.dest, c.offset, func(f *os.File) error { + if !c.writePrefix { + return nil + } + f.Write(prefix) + return nil + }); err != nil { + os.Remove(c.src) + t.Fatalf("Failed to copy %v", err) + } + + blob, err := ioutil.ReadFile(c.dest) + if err != nil { + os.Remove(c.src) + os.Remove(c.dest) + t.Fatalf("Failed to read %v", err) + } + want := content[c.offset:] + if c.writePrefix { + want = append(prefix, want...) + } + if !bytes.Equal(blob, want) { + t.Fatal("Unexpected value") + } + os.Remove(c.src) + os.Remove(c.dest) + } +} diff --git a/core/rawdb/table.go b/core/rawdb/table.go index 5e07ec43ad..2672f4ea8d 100644 --- a/core/rawdb/table.go +++ b/core/rawdb/table.go @@ -95,10 +95,15 @@ func (t *table) ReadAncients(fn func(reader ethdb.AncientReaderOp) error) (err e return t.db.ReadAncients(fn) } -// TruncateAncients is a noop passthrough that just forwards the request to the underlying +// TruncateHead is a noop passthrough that just forwards the request to the underlying // database. -func (t *table) TruncateAncients(items uint64) error { - return t.db.TruncateAncients(items) +func (t *table) TruncateHead(items uint64) error { + return t.db.TruncateHead(items) +} + +// TruncateTail is a noop passthrough that just forwards the request to the underlying +func (t *table) TruncateTail(items uint64) error { + return t.db.TruncateTail(items) } // Sync is a noop passthrough that just forwards the request to the underlying diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 96fbbd26b9..559507d7ff 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -66,9 +66,9 @@ var ( // Pruner is an offline tool to prune the stale state with the // help of the snapshot. The workflow of pruner is very simple: // -// - iterate the snapshot, reconstruct the relevant state -// - iterate the database, delete all other state entries which -// don't belong to the target state and the genesis state +// - iterate the snapshot, reconstruct the relevant state +// - iterate the database, delete all other state entries which +// don't belong to the target state and the genesis state // // It can take several hours(around 2 hours for mainnet) to finish // the whole pruning work. It's recommended to run this offline tool diff --git a/ethdb/database.go b/ethdb/database.go index 6d0e1147a1..a3c5570b53 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -87,12 +87,12 @@ type AncientReaderOp interface { // Ancients returns the ancient item numbers in the ancient store. Ancients() (uint64, error) - // Tail returns the number of first stored item in the ancient store. - // This number can also be interpreted as the total deleted items. - Tail() (uint64, error) - // AncientSize returns the ancient size of the specified category. AncientSize(kind string) (uint64, error) + + // Tail returns the number of first stored item in the freezer + // This number can also be interpreted as the total deleted item numbers (counting from 0) + Tail() (uint64, error) } // AncientReader is the extended ancient reader interface including 'batched' or 'atomic' reading. @@ -111,8 +111,22 @@ type AncientWriter interface { // The integer return value is the total size of the written data. ModifyAncients(func(AncientWriteOp) error) (int64, error) - // TruncateAncients discards all but the first n ancient data from the ancient store. - TruncateAncients(n uint64) error + /* + Tail ------------> Head + */ + + // TruncateHead discards all, but keep the first n ancient data from the ancient store. + // After the truncation, the latest item can be accessed it item_ n-1 (start from 0) + // Tail 0 -> (n-1)New-headxxxxOld-head + TruncateHead(n uint64) error + + // TruncateTail discards the first n ancient data from the ancient store. The already + // deleted items are ignored. After the truncation, the earliest item can be accessed + // is item_n(start from 0). The deleted items may not be removed from the ancient store + // immediately, but only when the accumulated deleted data reach the threshold then + // will be removed all together. + // Old-tail(0)xxxxxxxNew-tail(n)->Head + TruncateTail(n uint64) error // Sync flushes all in-memory ancient store data to disk. Sync() error