Skip to content

Commit

Permalink
freezerdb: fix migrate ancient data err when sync from history segment;
Browse files Browse the repository at this point in the history
chainindexer: fix index bloombits err when sync from history segment;
  • Loading branch information
0xbundler committed Dec 19, 2023
1 parent 507c0c2 commit 3be2236
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 68 deletions.
14 changes: 10 additions & 4 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,19 @@ func (bc *BlockChain) LastHistorySegment(num uint64) *params.HistorySegment {
return segment
}

func (bc *BlockChain) WriteCanonicalHeaders(headers []*types.Header, tds []uint64) error {
func (bc *BlockChain) WriteCanonicalBlockAndReceipt(headers []*types.Header, tds []uint64, bodies []*types.Body, receipts [][]*types.Receipt) error {
batch := bc.db.NewBatch()
for i, header := range headers {
h := header.Hash()
n := header.Number.Uint64()
rawdb.WriteTd(bc.db, h, n, new(big.Int).SetUint64(tds[i]))
rawdb.WriteHeader(bc.db, header)
rawdb.WriteCanonicalHash(bc.db, h, n)
rawdb.WriteTd(batch, h, n, new(big.Int).SetUint64(tds[i]))
rawdb.WriteHeader(batch, header)
rawdb.WriteBody(batch, h, n, bodies[i])
rawdb.WriteReceipts(batch, h, n, receipts[i])
rawdb.WriteCanonicalHash(batch, h, n)
}
if err := batch.Write(); err != nil {
return err
}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,8 @@ func (c *ChainIndexer) removeSectionHead(section uint64) {

c.indexDb.Delete(append([]byte("shead"), data[:]...))
}

// GetSection calculate section from head number
func (c *ChainIndexer) GetSection(head uint64) uint64 {
return (head + 1) / c.sectionSize
}
10 changes: 5 additions & 5 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,6 @@ func AncientInspect(db ethdb.Database) error {

log.Info("Inspect ancient prune situation...")
offset := counter(ReadOffSetOfCurrentAncientFreezer(db))
// if tail is not 0, just overwrite it
tail, _ := db.Tail()
if tail > 0 {
offset = counter(tail)
}
// Get number of ancient rows inside the freezer.
ancients := counter(0)
if count, err := db.ItemAmountInAncient(); err != nil {
Expand All @@ -602,6 +597,11 @@ func AncientInspect(db ethdb.Database) error {
} else {
endNumber = offset + ancients - 1
}
// if tail is not 0, just overwrite it
tail, _ := db.Tail()
if tail > 0 {
offset = counter(tail)
}
stats = [][]string{
{"Offset/StartBlockNumber", "Offset/StartBlockNumber of ancientDB", offset.String()},
{"Amount of remained items in AncientStore", "Remaining items of ancientDB", ancients.String()},
Expand Down
3 changes: 3 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if p, ok := eth.engine.(consensus.PoSA); ok {
p.SetupHistorySegment(hsm)
}
if lastSegment != nil {
eth.bloomIndexer.AddCheckpoint(eth.bloomIndexer.GetSection(lastSegment.ReGenesisNumber), lastSegment.ReGenesisHash)
}
}

bcVersion := rawdb.ReadDatabaseVersion(chainDb)
Expand Down
72 changes: 60 additions & 12 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ type BlockChain interface {
// LastHistorySegment get last history segment
LastHistorySegment(num uint64) *params.HistorySegment

// WriteCanonicalHeaders just write header into db, it an unsafe interface, just for history segment
WriteCanonicalHeaders([]*types.Header, []uint64) error
// WriteCanonicalBlockAndReceipt just write header into db, it an unsafe interface, just for history segment
WriteCanonicalBlockAndReceipt([]*types.Header, []uint64, []*types.Body, [][]*types.Receipt) error

// FreezerDBReset reset freezer db to target tail & head
FreezerDBReset(tail, head uint64) error
Expand Down Expand Up @@ -1779,18 +1779,66 @@ func (d *Downloader) findAncestorFromHistorySegment(p *peerConnection, remoteHei
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}

// check if it matches local last segment
// check if it matches local previous segment
h := hashes[0]
n := headers[0].Number.Uint64()
if d.lastSegment.MatchBlock(h, n) {
if !d.blockchain.HasHeader(h, n) {
// just write header, td, because it's snap sync, just sync history is enough
if err = d.blockchain.WriteCanonicalHeaders(headers, []uint64{d.lastSegment.TD}); err != nil {
return 0, err
}
log.Debug("sync history segment header to local", "number", n, "hash", h, "segment", d.lastSegment)
}
if !d.lastSegment.MatchBlock(h, n) {
return 0, nil
}

if d.blockchain.HasHeader(h, n) {
return n, nil
}
return 0, nil

body, receipts, err := d.fetchBodyAndReceiptsByHeader(p, headers[0], hashes[0])
if err != nil {
return 0, err
}
// just write header, td, because it's snap sync, just sync history is enough
if err = d.blockchain.WriteCanonicalBlockAndReceipt(headers, []uint64{d.lastSegment.TD}, []*types.Body{body}, [][]*types.Receipt{receipts}); err != nil {
return 0, err
}
log.Debug("sync history segment header to local", "number", n, "hash", h, "segment", d.lastSegment)
return n, nil
}

func (d *Downloader) fetchBodyAndReceiptsByHeader(p *peerConnection, header *types.Header, h common.Hash) (*types.Body, []*types.Receipt, error) {
// download ancient data
bodies, bodyHashset, err := d.fetchBodiesByHashes(p, []common.Hash{h})
if err != nil {
return nil, nil, err
}
if len(bodies) != 1 {
return nil, nil, fmt.Errorf("%w: multiple bodies (%d) for single request", errBadPeer, len(bodies))
}
if header.TxHash != bodyHashset[0][0] {
return nil, nil, fmt.Errorf("%w: fetch body with wrong TxHash %v, expect %v", errBadPeer, bodyHashset[0][0], header.TxHash)
}
if header.UncleHash != bodyHashset[1][0] {
return nil, nil, fmt.Errorf("%w: fetch body with wrong UncleHash %v, expect %v", errBadPeer, bodyHashset[0][1], header.UncleHash)
}
if header.WithdrawalsHash == nil {
if bodies[0].Withdrawals != nil {
return nil, nil, fmt.Errorf("%w: fetch body with wrong Withdrawals %v, expect %v", errBadPeer, len(bodies[0].Withdrawals), header.WithdrawalsHash)
}
} else {
if bodies[0].Withdrawals == nil {
return nil, nil, fmt.Errorf("%w: fetch body with wrong Withdrawals %v, expect %v", errBadPeer, len(bodies[0].Withdrawals), *header.WithdrawalsHash)
}
if bodyHashset[2][0] != *header.WithdrawalsHash {
return nil, nil, fmt.Errorf("%w: fetch body with wrong Withdrawals %v, expect %v", errBadPeer, bodyHashset[2][0], *header.WithdrawalsHash)
}
}

receipts, receiptHashes, err := d.fetchReceiptsByHashes(p, []common.Hash{h})
if err != nil {
return nil, nil, err
}
if len(receipts) != 1 {
return nil, nil, fmt.Errorf("%w: multiple receipts (%d) for single request", errBadPeer, len(receipts))
}
if header.ReceiptHash != receiptHashes[0] {
return nil, nil, fmt.Errorf("%w: fetch receipts with wrong ReceiptHash %v, expect %v", errBadPeer, receiptHashes[0], header.ReceiptHash)
}
return bodies[0], receipts[0], nil
}
95 changes: 95 additions & 0 deletions eth/downloader/fetchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,98 @@ func (d *Downloader) fetchHeadersByNumber(p *peerConnection, number uint64, amou
return *res.Res.(*eth.BlockHeadersPacket), res.Meta.([]common.Hash), nil
}
}

// fetchBodiesByHashes is a blocking version of Peer.RequestBodies
func (d *Downloader) fetchBodiesByHashes(p *peerConnection, hashes []common.Hash) ([]*types.Body, [][]common.Hash, error) {
// Create the response sink and send the network request
start := time.Now()
resCh := make(chan *eth.Response)

req, err := p.peer.RequestBodies(hashes, resCh)
if err != nil {
return nil, nil, err
}
defer req.Close()

// Wait until the response arrives, the request is cancelled or times out
ttl := d.peers.rates.TargetTimeout()

timeoutTimer := time.NewTimer(ttl)
defer timeoutTimer.Stop()

select {
case <-d.cancelCh:
return nil, nil, errCanceled

case <-timeoutTimer.C:
// Header retrieval timed out, update the metrics
p.log.Debug("Header request timed out", "elapsed", ttl)
headerTimeoutMeter.Mark(1)

return nil, nil, errTimeout

case res := <-resCh:
// Headers successfully retrieved, update the metrics
headerReqTimer.Update(time.Since(start))
headerInMeter.Mark(int64(len(*res.Res.(*eth.BlockBodiesPacket))))

// Don't reject the packet even if it turns out to be bad, downloader will
// disconnect the peer on its own terms. Simply delivery the headers to
// be processed by the caller
res.Done <- nil
packets := *res.Res.(*eth.BlockBodiesPacket)
bodies := make([]*types.Body, len(packets))
for i, p := range packets {
bodies[i] = &types.Body{
Transactions: p.Transactions,
Uncles: p.Uncles,
Withdrawals: p.Withdrawals,
}
}
hashsets := res.Meta.([][]common.Hash) // {txs hashes, uncle hashes, withdrawal hashes}
return bodies, hashsets, nil
}
}

// fetchReceiptsByHashes is a blocking version of Peer.RequestReceipts
func (d *Downloader) fetchReceiptsByHashes(p *peerConnection, hashes []common.Hash) ([][]*types.Receipt, []common.Hash, error) {
// Create the response sink and send the network request
start := time.Now()
resCh := make(chan *eth.Response)

req, err := p.peer.RequestReceipts(hashes, resCh)
if err != nil {
return nil, nil, err
}
defer req.Close()

// Wait until the response arrives, the request is cancelled or times out
ttl := d.peers.rates.TargetTimeout()

timeoutTimer := time.NewTimer(ttl)
defer timeoutTimer.Stop()

select {
case <-d.cancelCh:
return nil, nil, errCanceled

case <-timeoutTimer.C:
// Header retrieval timed out, update the metrics
p.log.Debug("Header request timed out", "elapsed", ttl)
headerTimeoutMeter.Mark(1)

return nil, nil, errTimeout

case res := <-resCh:
// Headers successfully retrieved, update the metrics
headerReqTimer.Update(time.Since(start))
headerInMeter.Mark(int64(len(*res.Res.(*eth.ReceiptsPacket))))

// Don't reject the packet even if it turns out to be bad, downloader will
// disconnect the peer on its own terms. Simply delivery the headers to
// be processed by the caller
res.Done <- nil
hashes := res.Meta.([]common.Hash) // {receipt hashes}
return *res.Res.(*eth.ReceiptsPacket), hashes, nil
}
}
Loading

0 comments on commit 3be2236

Please sign in to comment.