Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use binary diff to persist finalized states #7005

Open
wants to merge 42 commits into
base: feature/differential-archive
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7e89c60
Add binary diff codec
nazarhussain Aug 6, 2024
865212f
Add snapshot strategy
nazarhussain Aug 6, 2024
db6ba48
Rename the utils file
nazarhussain Aug 6, 2024
321c16b
Add diff strategy
nazarhussain Aug 6, 2024
c9a12a5
Rename historical sate util file
nazarhussain Aug 6, 2024
f33926c
Add skipped slot historical state
nazarhussain Aug 6, 2024
888853e
Add worker support
nazarhussain Aug 6, 2024
c6e5429
Update the archiver
nazarhussain Aug 6, 2024
0a9a262
Fix the types
nazarhussain Aug 6, 2024
dd8d8b2
Add vcdiff package
nazarhussain Aug 6, 2024
1827f39
Add unit tests for multiple diff
nazarhussain Aug 8, 2024
b7902ce
Add diff state tests
nazarhussain Aug 8, 2024
9f808fa
Add support to fetch last db state
nazarhussain Aug 8, 2024
871a2b2
Disable storing anchor state
nazarhussain Aug 8, 2024
481e11c
Extract historical state metrics
nazarhussain Aug 8, 2024
6df2a49
Update metrics usage
nazarhussain Aug 8, 2024
e46db7b
Add diff and snapshot size metrics
nazarhussain Aug 8, 2024
448e5b8
Add doc for historical state
nazarhussain Aug 8, 2024
d152046
Rename skip strategy to block replay
nazarhussain Aug 12, 2024
89d8349
Add diff layer configurations
nazarhussain Aug 12, 2024
1c877e2
Implement the state diff layers
nazarhussain Aug 12, 2024
60026f4
Use diff layers
nazarhussain Aug 13, 2024
d7cdb29
Remove the obsolete stateArchiveEpochFrequency option
nazarhussain Aug 13, 2024
268eb29
Rename stateArchive to stateSnapshotArchive
nazarhussain Aug 13, 2024
1f6293d
Split state snapshot and diff archives
nazarhussain Aug 13, 2024
0f48b51
Use state diff repository
nazarhussain Aug 13, 2024
abeb1c6
Use historical regen object
nazarhussain Aug 13, 2024
1c2a2bb
Fix lint errors
nazarhussain Aug 13, 2024
4cfb002
Refactored per suggested feedback
nazarhussain Aug 15, 2024
2b010a9
Add documentation details with more explanation
nazarhussain Aug 15, 2024
ec14a6d
Add logs to historical state
nazarhussain Aug 15, 2024
bf67d1e
Add perf tests and few analysis about the defaults
nazarhussain Aug 15, 2024
1bc8edd
Add unit tests for diffs
nazarhussain Aug 15, 2024
b00f8b1
Remove docs from this PR
nazarhussain Aug 24, 2024
74925b4
Change the defaul options for the diff layers
nazarhussain Aug 24, 2024
d60857e
Rename an attribute
nazarhussain Sep 12, 2024
bad4039
Fix some code after rebase
nazarhussain Sep 12, 2024
85262c7
Add dashbaord for hisotical state
nazarhussain Sep 12, 2024
4480a26
Add more logging
nazarhussain Sep 13, 2024
e34caa3
Add unit tests for last stored state
nazarhussain Sep 16, 2024
bc88c4a
Change default value for diff layers
nazarhussain Sep 16, 2024
7eb7c17
Add more logging for debugging
nazarhussain Sep 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
746 changes: 746 additions & 0 deletions dashboards/lodestar_historical_state_regen.json

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@
"strict-event-emitter-types": "^2.0.0",
"systeminformation": "^5.22.9",
"uint8arraylist": "^2.4.7",
"xdelta3-wasm": "^1.0.0",
"vcdiff-wasm": "^1.0.10",
"xxhash-wasm": "1.0.2"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ export function getLodestarApi({
},

async dumpDbStateIndex() {
return {data: await db.stateArchive.dumpRootIndexEntries()};
return {data: await db.stateSnapshotArchive.dumpRootIndexEntries()};
},
};
}
Expand Down
124 changes: 19 additions & 105 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,12 @@
import {Logger} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {Slot, Epoch} from "@lodestar/types";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {Logger} from "@lodestar/utils";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";
import {serializeState} from "../serializeState.js";
import {AllocSource, BufferPool} from "../../util/bufferPool.js";

/**
* Minimum number of epochs between single temp archived states
* These states will be pruned once a new state is persisted
*/
const PERSIST_TEMP_STATE_EVERY_EPOCHS = 32;
import {IHistoricalStateRegen} from "../historicalState/types.js";

export interface StatesArchiverOpts {
/**
* Minimum number of epochs between archived states
*/
archiveStateEpochFrequency: number;
}
export interface StatesArchiverOpts {}

/**
* Archives finalized states from active bucket to archive bucket.
Expand All @@ -29,57 +15,15 @@ export interface StatesArchiverOpts {
*/
export class StatesArchiver {
constructor(
private readonly historicalStateRegen: IHistoricalStateRegen | undefined,
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
private readonly opts: StatesArchiverOpts
) {}

/**
* Persist states every some epochs to
* - Minimize disk space, storing the least states possible
* - Minimize the sync progress lost on unexpected crash, storing temp state every few epochs
*
* At epoch `e` there will be states peristed at intervals of `PERSIST_STATE_EVERY_EPOCHS` = 32
* and one at `PERSIST_TEMP_STATE_EVERY_EPOCHS` = 1024
* ```
* | | | .
* epoch - 1024*2 epoch - 1024 epoch - 32 epoch
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

if (finalized.epoch - lastStoredEpoch >= Math.min(PERSIST_TEMP_STATE_EVERY_EPOCHS, archiveStateEpochFrequency)) {
await this.archiveState(finalized);

// Only check the current and previous intervals
const minEpoch = Math.max(
0,
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);

const storedStateSlots = await this.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});

const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
}

// More logs to investigate the rss spike issue https://github.com/ChainSafe/lodestar/issues/5591
this.logger.verbose("Archived state completed", {
finalizedEpoch: finalized.epoch,
minEpoch,
storedStateSlots: storedStateSlots.join(","),
statesSlotsToDelete: statesSlotsToDelete.join(","),
});
}
await this.archiveState(finalized);
}

/**
Expand All @@ -88,49 +32,19 @@ export class StatesArchiver {
*/
async archiveState(finalized: CheckpointWithHex): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// serialize state using BufferPool if provided
await serializeState(
finalizedStateOrBytes,
AllocSource.ARCHIVE_STATE,
(stateBytes) => this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes),
this.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
});
const state = await this.regen.getCheckpointStateOrBytes(finalized);
if (state === null) {
this.logger.warn("Checkpoint state not available to archive.", {epoch: finalized.epoch, root: finalized.rootHex});
return;
}
}
}

/**
* Keeps first epoch per interval of persistEveryEpochs, deletes the rest
*/
export function computeStateSlotsToDelete(storedStateSlots: Slot[], persistEveryEpochs: Epoch): Slot[] {
const persistEverySlots = persistEveryEpochs * SLOTS_PER_EPOCH;
const intervalsWithStates = new Set<number>();
const stateSlotsToDelete = new Set<number>();

for (const slot of storedStateSlots) {
const interval = Math.floor(slot / persistEverySlots);
if (intervalsWithStates.has(interval)) {
stateSlotsToDelete.add(slot);
} else {
intervalsWithStates.add(interval);
if (Array.isArray(state) && state.constructor === Uint8Array) {
return this.historicalStateRegen?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
}
}

return Array.from(stateSlotsToDelete.values());
return this.historicalStateRegen?.storeHistoricalState(
(state as CachedBeaconStateAllForks).slot,
(state as CachedBeaconStateAllForks).serialize()
);
}
}
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {IBeaconChain} from "../interface.js";
import {ChainEvent} from "../emitter.js";
import {DiffLayers} from "../historicalState/diffLayers.js";
import {StatesArchiver, StatesArchiverOpts} from "./archiveStates.js";
import {archiveBlocks} from "./archiveBlocks.js";

Expand Down Expand Up @@ -43,12 +44,13 @@ export class Archiver {
constructor(
private readonly db: IBeaconDb,
private readonly chain: IBeaconChain,
private readonly diffLayers: DiffLayers,
private readonly logger: Logger,
signal: AbortSignal,
opts: ArchiverOpts
) {
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts, chain.bufferPool);
this.statesArchiver = new StatesArchiver(chain.historicalStateRegen, chain.regen, db, logger, opts);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
Expand Down
12 changes: 7 additions & 5 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ import {BlockAttributes, produceBlockBody, produceCommonBlockBody} from "./produ
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {HistoricalStateRegen} from "./historicalState/index.js";
import {BlockRewards, computeBlockRewards} from "./rewards/blockRewards.js";
import {ShufflingCache} from "./shufflingCache.js";
import {BlockStateCacheImpl} from "./stateCache/blockStateCacheImpl.js";
Expand All @@ -101,6 +100,8 @@ import {DbCPStateDatastore} from "./stateCache/datastore/db.js";
import {FileCPStateDatastore} from "./stateCache/datastore/file.js";
import {SyncCommitteeRewards, computeSyncCommitteeRewards} from "./rewards/syncCommitteeRewards.js";
import {AttestationsRewards, computeAttestationsRewards} from "./rewards/attestationsRewards.js";
import {DiffLayers} from "./historicalState/diffLayers.js";
import {IHistoricalStateRegen} from "./historicalState/types.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -130,7 +131,7 @@ export class BeaconChain implements IBeaconChain {
readonly regen: QueuedStateRegenerator;
readonly lightClientServer?: LightClientServer;
readonly reprocessController: ReprocessController;
readonly historicalStateRegen?: HistoricalStateRegen;
readonly historicalStateRegen?: IHistoricalStateRegen;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -201,7 +202,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
historicalStateRegen?: HistoricalStateRegen;
historicalStateRegen?: IHistoricalStateRegen;
}
) {
this.opts = opts;
Expand Down Expand Up @@ -334,7 +335,8 @@ export class BeaconChain implements IBeaconChain {
this.bls = bls;
this.emitter = emitter;

this.archiver = new Archiver(db, this, logger, signal, opts);
// TODO: Decouple DiffLayers from archiver
this.archiver = new Archiver(db, this, new DiffLayers(), logger, signal, opts);
// always run PrepareNextSlotScheduler except for fork_choice spec tests
if (!opts?.disablePrepareNextSlot) {
new PrepareNextSlotScheduler(this, this.config, metrics, this.logger, signal);
Expand Down Expand Up @@ -521,7 +523,7 @@ export class BeaconChain implements IBeaconChain {
};
}

const data = await this.db.stateArchive.getByRoot(fromHexString(stateRoot));
const data = await this.db.stateSnapshotArchive.getByRoot(fromHexString(stateRoot));
return data && {state: data, executionOptimistic: false, finalized: true};
}

Expand Down
112 changes: 112 additions & 0 deletions packages/beacon-node/src/chain/historicalState/diffLayers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import {Slot} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {StateArchiveStrategy} from "./types.js";

/*
* Computed over dev machine with performance tests a diff patch take ~325us
* So a duration of 1024 epochs can be covered with maximum 3 diffs and that will take ~1ms without IO time
* For block replay it depends upon exactly which slot user requested and what contains in those blocks,
* but there will always be less than 4 epochs of the block replay.
*
* NOTE: Changing this default will require nodes to resync.
*/
export const DEFAULT_DIFF_LAYERS = "2, 8, 32, 128, 512";

export class DiffLayers {
private snapshotEverySlot: number;
private diffEverySlot: number[];

/**
* Initialized with the comma separated values in ascending order e.g. 2,4,6,10
* These values will represent every nth epoch and each consider as a layer
* The last value which should be highest should be consider as snapshot layer.
*/
constructor(layers?: string) {
const epochs = DiffLayers.parse(layers ?? DEFAULT_DIFF_LAYERS);
this.snapshotEverySlot = epochs[epochs.length - 1] * SLOTS_PER_EPOCH;
this.diffEverySlot = epochs
.slice(0, -1)
// Reverse here, so lower layer get higher priority when matching
.reverse()
.map((s) => s * SLOTS_PER_EPOCH);
}

getLayersString(): string {
return `${this.diffEverySlot
.reverse()
.map((s) => s / SLOTS_PER_EPOCH)
.join(",")},${this.snapshotEverySlot / SLOTS_PER_EPOCH}`;
}

get totalLayers(): number {
return this.diffEverySlot.length + 1;
}

static parse(layers: string): number[] {
const layerEpochs = [
...new Set(
layers
.split(",")
.map((s) => s.trim())
.map((n) => parseInt(n))
),
];

if (layerEpochs.length !== layers.split(",").length) {
throw new Error(`Please provide unique epoch intervals. Given = ${layers}`);
}

if ([...layerEpochs].sort((a, b) => a - b).join(",") !== layerEpochs.join(",")) {
throw new Error(`Please provide diff layers in ascending order. Given = ${layers}`);
}

return layerEpochs;
}

getArchiveStrategy(slot: Slot): StateArchiveStrategy {
if (slot === 0) {
return StateArchiveStrategy.Snapshot;
}

if (slot % this.snapshotEverySlot === 0) return StateArchiveStrategy.Snapshot;
if (this.diffEverySlot.some((s) => slot % s === 0)) return StateArchiveStrategy.Diff;

return StateArchiveStrategy.BlockReplay;
}

getArchiveLayers(slot: Slot): Slot[] {
const path: Slot[] = [];
let lastSlot: number | undefined = undefined;

for (let layer = 0; layer < this.totalLayers; layer++) {
const newSlot = this.getLastSlotForLayer(slot, layer);
if (lastSlot === undefined || newSlot > lastSlot) {
lastSlot = newSlot;
path.push(newSlot);
}
}
return [...new Set(path)];
}

getLastSlotForLayer(slot: Slot, layer: number): Slot {
if (layer < 0 || layer > this.totalLayers) {
throw new Error(`Invalid layer number. Must be between 0-${this.totalLayers - 1}`);
}

if (layer === 0) {
if (slot % this.snapshotEverySlot === 0) {
return slot;
} else {
return Math.max(0, slot - (slot % this.snapshotEverySlot));
}
}

const diffEverySlot = this.diffEverySlot[layer - 1];

if (slot % diffEverySlot === 0) {
return slot;
} else {
return Math.max(0, slot - (slot % diffEverySlot));
}
}
}
Loading
Loading