diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/generate_test.go index 891111973a5e..8cf47fde04cc 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/generate_test.go @@ -224,6 +224,16 @@ func (t *testHelper) Commit() common.Hash { } t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, nil) t.triedb.Commit(root, false) + + // re-open the trie database to ensure the frozen buffer + // is not referenced + config := &triedb.Config{} + if t.triedb.Scheme() == rawdb.PathScheme { + config.PathDB = &pathdb.Config{} // disable caching + } else { + config.HashDB = &hashdb.Config{} // disable caching + } + t.triedb = triedb.NewDatabase(t.triedb.Disk(), config) return root } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 9441834c6a24..625f265e8c58 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -976,19 +976,22 @@ func TestMissingTrieNodes(t *testing.T) { func testMissingTrieNodes(t *testing.T, scheme string) { // Create an initial state with a few accounts var ( - tdb *triedb.Database - memDb = rawdb.NewMemoryDatabase() + tdb *triedb.Database + memDb = rawdb.NewMemoryDatabase() + openDb = func() *triedb.Database { + if scheme == rawdb.PathScheme { + return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ + CleanCacheSize: 0, + DirtyCacheSize: 0, + }}) // disable caching + } else { + return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ + CleanCacheSize: 0, + }}) // disable caching + } + } ) - if scheme == rawdb.PathScheme { - tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ - CleanCacheSize: 0, - DirtyCacheSize: 0, - }}) // disable caching - } else { - tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ - CleanCacheSize: 0, - }}) // disable caching - } + tdb = openDb() db := NewDatabase(tdb, nil) var root common.Hash @@ -1006,17 +1009,29 @@ func testMissingTrieNodes(t *testing.T, scheme string) { tdb.Commit(root, false) } // Create a new state on the old root - state, _ = New(root, db) // Now we clear out the memdb it := memDb.NewIterator(nil, nil) for it.Next() { k := it.Key() + // Leave the root intact - if !bytes.Equal(k, root[:]) { - t.Logf("key: %x", k) - memDb.Delete(k) + if scheme == rawdb.HashScheme { + if !bytes.Equal(k, root[:]) { + t.Logf("key: %x", k) + memDb.Delete(k) + } + } + if scheme == rawdb.PathScheme { + rk := k[len(rawdb.TrieNodeAccountPrefix):] + if len(rk) != 0 { + t.Logf("key: %x", k) + memDb.Delete(k) + } } } + tdb = openDb() + db = NewDatabase(tdb, nil) + state, _ = New(root, db) balance := state.GetBalance(addr) // The removed elem should lead to it returning zero balance if exp, got := uint64(0), balance.Uint64(); got != exp { diff --git a/eth/handler.go b/eth/handler.go index d5117584c001..b28081eef0ec 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/forkid" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -41,7 +40,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" - "github.com/ethereum/go-ethereum/triedb/pathdb" ) const ( @@ -558,7 +556,4 @@ func (h *handler) enableSyncedFeatures() { log.Info("Snap sync complete, auto disabling") h.snapSync.Store(false) } - if h.chain.TrieDB().Scheme() == rawdb.PathScheme { - h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize) - } } diff --git a/triedb/database.go b/triedb/database.go index c1e6f9af4e69..40028387d87f 100644 --- a/triedb/database.go +++ b/triedb/database.go @@ -314,17 +314,6 @@ func (db *Database) Journal(root common.Hash) error { return pdb.Journal(root) } -// SetBufferSize sets the node buffer size to the provided value(in bytes). -// It's only supported by path-based database and will return an error for -// others. -func (db *Database) SetBufferSize(size int) error { - pdb, ok := db.backend.(*pathdb.Database) - if !ok { - return errors.New("not supported") - } - return pdb.SetBufferSize(size) -} - // IsVerkle returns the indicator if the database is holding a verkle tree. func (db *Database) IsVerkle() bool { return db.config.IsVerkle diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 31e478117cd5..36df506b78d8 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -44,12 +44,12 @@ const ( // support is 4GB, node will panic if batch size exceeds this limit. maxBufferSize = 256 * 1024 * 1024 - // DefaultBufferSize is the default memory allowance of node buffer + // defaultBufferSize is the default memory allowance of node buffer // that aggregates the writes from above until it's flushed into the // disk. It's meant to be used once the initial sync is finished. // Do not increase the buffer size arbitrarily, otherwise the system // pause time will increase when the database writes happen. - DefaultBufferSize = 64 * 1024 * 1024 + defaultBufferSize = 64 * 1024 * 1024 ) var ( @@ -111,7 +111,7 @@ func (c *Config) sanitize() *Config { var Defaults = &Config{ StateHistory: params.FullImmutabilityThreshold, CleanCacheSize: defaultCleanSize, - DirtyCacheSize: DefaultBufferSize, + DirtyCacheSize: defaultBufferSize, } // ReadOnly is the config in order to open database in read only mode. @@ -341,7 +341,7 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0))) + db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil)) // Re-enable the database as the final step. db.waitSync = false @@ -440,7 +440,13 @@ func (db *Database) Close() error { db.readOnly = true // Release the memory held by clean cache. - db.tree.bottom().resetCache() + disk := db.tree.bottom() + if disk.frozen != nil { + if err := disk.frozen.flushed(); err != nil { + return err + } + } + disk.resetCache() // Close the attached state history freezer. if db.freezer == nil { @@ -478,19 +484,6 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool { return inited } -// SetBufferSize sets the node buffer size to the provided value(in bytes). -func (db *Database) SetBufferSize(size int) error { - db.lock.Lock() - defer db.lock.Unlock() - - if size > maxBufferSize { - log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxBufferSize)) - size = maxBufferSize - } - db.bufferSize = size - return db.tree.bottom().setBufferSize(db.bufferSize) -} - // modifyAllowed returns the indicator if mutation is allowed. This function // assumes the db.lock is already held. func (db *Database) modifyAllowed() error { diff --git a/triedb/pathdb/difflayer.go b/triedb/pathdb/difflayer.go index 6b87883482c9..71c19e75e744 100644 --- a/triedb/pathdb/difflayer.go +++ b/triedb/pathdb/difflayer.go @@ -125,7 +125,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes map } // persist flushes the diff layer and all its parent layers to disk layer. -func (dl *diffLayer) persist(force bool) (layer, error) { +func (dl *diffLayer) persist(force bool) (*diskLayer, error) { if parent, ok := dl.parentLayer().(*diffLayer); ok { // Hold the lock to prevent any read operation until the new // parent is linked correctly. @@ -147,7 +147,7 @@ func (dl *diffLayer) persist(force bool) (layer, error) { // diffToDisk merges a bottom-most diff into the persistent disk layer underneath // it. The method will panic if called onto a non-bottom-most diff layer. -func diffToDisk(layer *diffLayer, force bool) (layer, error) { +func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) { disk, ok := layer.parentLayer().(*diskLayer) if !ok { panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer())) diff --git a/triedb/pathdb/difflayer_test.go b/triedb/pathdb/difflayer_test.go index 1e93a3f89214..f9c39d3085a2 100644 --- a/triedb/pathdb/difflayer_test.go +++ b/triedb/pathdb/difflayer_test.go @@ -30,7 +30,7 @@ import ( func emptyLayer() *diskLayer { return &diskLayer{ db: New(rawdb.NewMemoryDatabase(), nil, false), - buffer: newNodeBuffer(DefaultBufferSize, nil, 0), + buffer: newNodeBuffer(defaultBufferSize, nil, 0), } } diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index b6ae39446cf0..7239ba7d16f7 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -36,12 +36,13 @@ type diskLayer struct { db *Database // Path-based trie database cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs buffer *nodebuffer // Node buffer to aggregate writes + frozen *nodebuffer // Frozen node buffer waiting for flushing stale bool // Signals that the layer became stale (state progressed) lock sync.RWMutex // Lock used to protect stale flag } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer { +func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer, frozen *nodebuffer) *diskLayer { // Initialize a clean cache if the memory allowance is not zero // or reuse the provided cache if it is not nil (inherited from // the original disk layer). @@ -54,6 +55,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.C db: db, cleans: cleans, buffer: buffer, + frozen: frozen, } } @@ -102,16 +104,19 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co if dl.stale { return nil, common.Hash{}, nil, errSnapshotStale } - // Try to retrieve the trie node from the not-yet-written - // node buffer first. Note the buffer is lock free since - // it's impossible to mutate the buffer before tagging the - // layer as stale. - n, found := dl.buffer.node(owner, path) - if found { - dirtyHitMeter.Mark(1) - dirtyReadMeter.Mark(int64(len(n.Blob))) - dirtyNodeHitDepthHist.Update(int64(depth)) - return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil + // Try to retrieve the trie node from the not-yet-written node buffer first + // (both the live one and the frozen one). Note the buffer is lock free since + // it's impossible to mutate the buffer before tagging the layer as stale. + for _, buffer := range []*nodebuffer{dl.buffer, dl.frozen} { + if buffer != nil { + n, found := buffer.node(owner, path) + if found { + dirtyHitMeter.Mark(1) + dirtyReadMeter.Mark(int64(len(n.Blob))) + dirtyNodeHitDepthHist.Update(int64(depth)) + return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil + } + } } dirtyMissMeter.Mark(1) @@ -182,7 +187,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // Mark the diskLayer as stale before applying any mutations on top. dl.stale = true - // Store the root->id lookup afterwards. All stored lookups are identified + // Store the root->id lookup afterward. All stored lookups are identified // by the **unique** state root. It's impossible that in the same chain // blocks are not adjacent but have the same root. if dl.id == 0 { @@ -190,21 +195,43 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { } rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID()) - // Construct a new disk layer by merging the nodes from the provided diff - // layer, and flush the content in disk layer if there are too many nodes - // cached. The clean cache is inherited from the original disk layer. - ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes)) - // In a unique scenario where the ID of the oldest history object (after tail // truncation) surpasses the persisted state ID, we take the necessary action - // of forcibly committing the cached dirty nodes to ensure that the persisted + // of forcibly committing the cached dirty states to ensure that the persisted // state ID remains higher. - if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest { + persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb) + if !force && persistedID < oldest { force = true } - if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil { - return nil, err + // Merge the nodes of the bottom-most diff layer into the buffer as the combined one + combined := dl.buffer.commit(bottom.nodes) + if combined.full() || force { + // Wait until the previous frozen buffer is fully flushed + if dl.frozen != nil { + if err := dl.frozen.flushed(); err != nil { + return nil, err + } + } + dl.frozen = nil + + // Freeze the live buffer and schedule background flushing + dl.frozen = combined + dl.frozen.flush(dl.db.diskdb, dl.cleans, bottom.stateID()) + + // Block until the frozen buffer is fully flushed out if the oldest history + // surpasses the persisted state ID. + if persistedID < oldest { + if err := dl.frozen.flushed(); err != nil { + return nil, err + } + } + combined = newNodeBuffer(dl.db.bufferSize, nil, 0) } + // Construct a new disk layer by merging the nodes from the provided diff + // layer, and flush the content in disk layer if there are too many nodes + // cached. The clean cache is inherited from the original disk layer. + ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, combined, dl.frozen) + // To remove outdated history objects from the end, we set the 'tail' parameter // to 'oldest-1' due to the offset between the freezer index and the history ID. if overflow { @@ -249,6 +276,13 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { return nil, err } } else { + // Block until the frozen buffer is fully flushed + if dl.frozen != nil { + if err := dl.frozen.flushed(); err != nil { + return nil, err + } + dl.frozen = nil // unset the frozen buffer + } batch := dl.db.diskdb.NewBatch() writeNodes(batch, nodes, dl.cleans) rawdb.WritePersistentStateID(batch, dl.id-1) @@ -256,18 +290,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { log.Crit("Failed to write states", "err", err) } } - return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil -} - -// setBufferSize sets the node buffer size to the provided value. -func (dl *diskLayer) setBufferSize(size int) error { - dl.lock.RLock() - defer dl.lock.RUnlock() - - if dl.stale { - return errSnapshotStale - } - return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id) + return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer, dl.frozen), nil } // size returns the approximate size of cached nodes in the disk layer. diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 1740ec593511..0d4c2025a980 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -136,7 +136,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0)) + return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil) } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -176,7 +176,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { nodes[entry.Owner] = subset } // Calculate the internal state transitions by id difference. - base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored)) + base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored), nil) return base, nil } @@ -342,6 +342,11 @@ func (db *Database) Journal(root common.Hash) error { return fmt.Errorf("triedb layer [%#x] missing", root) } disk := db.tree.bottom() + if disk.frozen != nil { + if err := disk.frozen.flushed(); err != nil { + return err + } + } if l, ok := l.(*diffLayer); ok { log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers) } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) diff --git a/triedb/pathdb/layertree.go b/triedb/pathdb/layertree.go index d314779910e9..795ec7917f1d 100644 --- a/triedb/pathdb/layertree.go +++ b/triedb/pathdb/layertree.go @@ -131,6 +131,12 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { if err != nil { return err } + // Block until the frozen buffer is fully flushed + if base.frozen != nil { + if err := base.frozen.flushed(); err != nil { + return err + } + } // Replace the entire layer tree with the flat base tree.layers = map[common.Hash]layer{base.rootHash(): base} return nil diff --git a/triedb/pathdb/nodebuffer.go b/triedb/pathdb/nodebuffer.go index d3492602c8b7..8ea64a36ad7e 100644 --- a/triedb/pathdb/nodebuffer.go +++ b/triedb/pathdb/nodebuffer.go @@ -39,6 +39,9 @@ type nodebuffer struct { size uint64 // The size of aggregated writes limit uint64 // The maximum memory allowance in bytes nodes map[common.Hash]map[string]*trienode.Node // The dirty node set, mapped by owner and path + + done chan struct{} // notifier whether the content in buffer has been flushed or not + flushErr error // error if any exception occurs during flushing } // newNodeBuffer initializes the node buffer with the provided nodes. @@ -192,11 +195,10 @@ func (b *nodebuffer) empty() bool { return b.layers == 0 } -// setSize sets the buffer size to the provided number, and invokes a flush -// operation if the current memory usage exceeds the new limit. -func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error { - b.limit = uint64(size) - return b.flush(db, clean, id, false) +// full returns an indicator if the size of accumulated data exceeds the configured +// threshold. +func (b *nodebuffer) full() bool { + return b.size > b.limit } // allocBatch returns a database batch with pre-allocated buffer. @@ -212,35 +214,45 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch { return db.NewBatchWithSize((metasize + int(b.size)) * 11 / 10) // extra 10% for potential pebble internal stuff } -// flush persists the in-memory dirty trie node into the disk if the configured -// memory threshold is reached. Note, all data must be written atomically. -func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error { - if b.size <= b.limit && !force { - return nil - } - // Ensure the target state id is aligned with the internal counter. - head := rawdb.ReadPersistentStateID(db) - if head+b.layers != id { - return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id) - } - var ( - start = time.Now() - batch = b.allocBatch(db) - ) - nodes := writeNodes(batch, b.nodes, clean) - rawdb.WritePersistentStateID(batch, id) +// flush persists the in-memory dirty trie node into the disk. Note, all data must be written atomically. +func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) { + b.done = make(chan struct{}) - // Flush all mutations in a single batch - size := batch.ValueSize() - if err := batch.Write(); err != nil { - return err - } - commitBytesMeter.Mark(int64(size)) - commitNodesMeter.Mark(int64(nodes)) - commitTimeTimer.UpdateSince(start) - log.Debug("Persisted pathdb nodes", "nodes", len(b.nodes), "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) - b.reset() - return nil + go func() { + defer func() { + close(b.done) + }() + // Error out if the state id is aligned with the internal counter + head := rawdb.ReadPersistentStateID(db) + if head+b.layers != id { + b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id) + return + } + var ( + start = time.Now() + batch = b.allocBatch(db) + ) + nodes := writeNodes(batch, b.nodes, clean) + rawdb.WritePersistentStateID(batch, id) + + // Flush all mutations in a single batch + size := batch.ValueSize() + if err := batch.Write(); err != nil { + b.flushErr = err + return + } + commitBytesMeter.Mark(int64(size)) + commitNodesMeter.Mark(int64(nodes)) + commitTimeTimer.UpdateSince(start) + log.Info("Persisted pathdb nodes", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + }() +} + +// flushed blocks until the buffer is fully flushed and also returns the memorized +// error which occurs within the flushing. +func (b *nodebuffer) flushed() error { + <-b.done + return b.flushErr } // writeNodes writes the trie nodes into the provided database batch.