Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add insturmentation to attestation and epoch quote mem pools #9055

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions yarn-project/circuit-types/src/p2p/block_attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ export class BlockAttestation extends Gossipable {
static empty(): BlockAttestation {
return new BlockAttestation(ConsensusPayload.empty(), Signature.empty());
}

getSize(): number {
return this.payload.getSize() + this.signature.getSize();
}
}
4 changes: 4 additions & 0 deletions yarn-project/circuit-types/src/p2p/block_proposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ export class BlockProposal extends Gossipable {
const reader = BufferReader.asReader(buf);
return new BlockProposal(reader.readObject(ConsensusPayload), reader.readObject(Signature));
}

getSize(): number {
return this.payload.getSize() + this.signature.getSize();
}
}
8 changes: 8 additions & 0 deletions yarn-project/circuit-types/src/p2p/consensus_payload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,12 @@ export class ConsensusPayload implements Signable {
static empty(): ConsensusPayload {
return new ConsensusPayload(Header.empty(), Fr.ZERO, []);
}

/**
* Get the size of the consensus payload in bytes.
* @returns The size of the consensus payload.
*/
getSize(): number {
return this.header.getSize() + this.archive.size + this.txHashes.length * TxHash.SIZE;
}
}
7 changes: 7 additions & 0 deletions yarn-project/circuit-types/src/p2p/gossipable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,11 @@ export abstract class Gossipable {
* - Serialization method
*/
abstract toBuffer(): Buffer;

/**
* Get the size of the gossipable object.
*
* This is used for metrics recording.
*/
abstract getSize(): number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,12 @@ export class EpochProofQuote extends Gossipable {
signature: this.signature.toViemSignature(),
};
}

/**
* Get the size of the epoch proof quote in bytes.
* @returns The size of the epoch proof quote in bytes.
*/
getSize(): number {
return this.payload.getSize() + this.signature.getSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ export class EpochProofQuotePayload {
};
}

getSize(): number {
// 32 bytes for epochToProve, 32 bytes for validUntilSlot, 32 bytes for bondAmount, 20 bytes for prover, 4 bytes for basisPointFee
return 32 + 32 + 32 + EthAddress.SIZE_IN_BYTES + 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of stuff scares me for when things get out of sync. It doesn't feel like this should be called often- can we just call toBuffer and get length?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could cache the result if performance was a concern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perf was the concern, ill play with that idea though

}

[inspect.custom](): string {
return `EpochProofQuotePayload { epochToProve: ${this.epochToProve}, validUntilSlot: ${this.validUntilSlot}, bondAmount: ${this.bondAmount}, prover: ${this.prover}, basisPointFee: ${this.basisPointFee} }`;
}
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/foundation/src/eth-signature/eth_signature.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ export class Signature {
return serializeToBuffer([this.r, this.s, this.v]);
}

getSize(): number {
// 32 bytes for r, 32 bytes for s, 4 bytes for v
return Buffer32.SIZE + Buffer32.SIZE + 4;
}

to0xString(): `0x${string}` {
return `0x${this.r.toString()}${this.s.toString()}${this.v.toString(16)}`;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { type BlockAttestation } from '@aztec/circuit-types';
import { Fr } from '@aztec/foundation/fields';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { type MockProxy, mock } from 'jest-mock-extended';
import { type PrivateKeyAccount } from 'viem';

import { type PoolInstrumentation } from '../tx_pool/instrumentation.js';
import { InMemoryAttestationPool } from './memory_attestation_pool.js';
import { generateAccount, mockAttestation } from './mocks.js';

Expand All @@ -10,10 +14,20 @@ const NUMBER_OF_SIGNERS_PER_TEST = 4;
describe('MemoryAttestationPool', () => {
let ap: InMemoryAttestationPool;
let signers: PrivateKeyAccount[];
const telemetry = new NoopTelemetryClient();

// Check that metrics are recorded correctly
let metricsMock: MockProxy<PoolInstrumentation<BlockAttestation>>;

beforeEach(() => {
ap = new InMemoryAttestationPool();
// Use noop telemetry client while testing.

ap = new InMemoryAttestationPool(telemetry);
signers = Array.from({ length: NUMBER_OF_SIGNERS_PER_TEST }, generateAccount);

metricsMock = mock<PoolInstrumentation<BlockAttestation>>();
// Can i overwrite this like this??
(ap as any).metrics = metricsMock;
});

it('should add attestations to pool', async () => {
Expand All @@ -25,6 +39,9 @@ describe('MemoryAttestationPool', () => {

await ap.addAttestations(attestations);

// Check metrics have been updated.
expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);

expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST);
Expand All @@ -33,6 +50,8 @@ describe('MemoryAttestationPool', () => {
// Delete by slot
await ap.deleteAttestationsForSlot(BigInt(slotNumber));

expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
expect(retreivedAttestationsAfterDelete.length).toBe(0);
});
Expand Down Expand Up @@ -82,12 +101,16 @@ describe('MemoryAttestationPool', () => {

await ap.addAttestations(attestations);

expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST);
expect(retreivedAttestations).toEqual(attestations);

await ap.deleteAttestations(attestations);

expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length);

const gottenAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
expect(gottenAfterDelete.length).toBe(0);
});
Expand Down Expand Up @@ -118,12 +141,16 @@ describe('MemoryAttestationPool', () => {

await ap.addAttestations(attestations);

expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestations = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST);
expect(retreivedAttestations).toEqual(attestations);

await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId);

expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(attestations.length);

const retreivedAttestationsAfterDelete = await ap.getAttestationsForSlot(BigInt(slotNumber), proposalId);
expect(retreivedAttestationsAfterDelete.length).toBe(0);
});
Expand Down
43 changes: 37 additions & 6 deletions yarn-project/p2p/src/attestation_pool/memory_attestation_pool.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { type BlockAttestation } from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { PoolInstrumentation } from '../tx_pool/instrumentation.js';
import { type AttestationPool } from './attestation_pool.js';

export class InMemoryAttestationPool implements AttestationPool {
private metrics: PoolInstrumentation<BlockAttestation>;

private attestations: Map</*slot=*/ bigint, Map</*proposalId*/ string, Map</*address=*/ string, BlockAttestation>>>;

constructor(private log = createDebugLogger('aztec:attestation_pool')) {
constructor(telemetry: TelemetryClient, private log = createDebugLogger('aztec:attestation_pool')) {
this.attestations = new Map();
this.metrics = new PoolInstrumentation(telemetry, 'InMemoryAttestationPool');
}

public getAttestationsForSlot(slot: bigint, proposalId: string): Promise<BlockAttestation[]> {
Expand Down Expand Up @@ -35,21 +40,46 @@ export class InMemoryAttestationPool implements AttestationPool {

this.log.verbose(`Added attestation for slot ${slotNumber} from ${address}`);
}

// TODO: set these to pending or something ????
this.metrics.recordAddedObjects(attestations.length);
return Promise.resolve();
}

#getNumberOfAttestationsInSlot(slot: bigint): number {
let total = 0;
const slotAttestationMap = getSlotOrDefault(this.attestations, slot);

if (slotAttestationMap) {
for (const proposalAttestationMap of slotAttestationMap.values() ?? []) {
total += proposalAttestationMap.size;
}
}
return total;
}

public deleteAttestationsForSlot(slot: bigint): Promise<void> {
// TODO(md): check if this will free the memory of the inner hash map
// We count the number of attestations we are removing
const numberOfAttestations = this.#getNumberOfAttestationsInSlot(slot);

this.attestations.delete(slot);
this.log.verbose(`Removed attestation for slot ${slot}`);
this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot}`);

this.metrics.recordRemovedObjects(numberOfAttestations);
return Promise.resolve();
}

public deleteAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise<void> {
const slotAttestationMap = this.attestations.get(slot);
const slotAttestationMap = getSlotOrDefault(this.attestations, slot);
if (slotAttestationMap) {
slotAttestationMap.delete(proposalId);
this.log.verbose(`Removed attestation for slot ${slot}`);
if (slotAttestationMap.has(proposalId)) {
const numberOfAttestations = slotAttestationMap.get(proposalId)?.size ?? 0;

slotAttestationMap.delete(proposalId);

this.log.verbose(`Removed ${numberOfAttestations} attestations for slot ${slot} and proposal ${proposalId}`);
this.metrics.recordRemovedObjects(numberOfAttestations);
}
}
return Promise.resolve();
}
Expand All @@ -68,6 +98,7 @@ export class InMemoryAttestationPool implements AttestationPool {
}
}
}
this.metrics.recordRemovedObjects(attestations.length);
return Promise.resolve();
}
}
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/p2p/src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ export const createP2PClient = async (
let config = { ..._config };
const store = deps.store ?? (await createStore('p2p', config, createDebugLogger('aztec:p2p:lmdb')));
const txPool = deps.txPool ?? new AztecKVTxPool(store, telemetry);
const attestationsPool = deps.attestationsPool ?? new InMemoryAttestationPool();
const epochProofQuotePool = deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool();
const attestationsPool = deps.attestationsPool ?? new InMemoryAttestationPool(telemetry);
const epochProofQuotePool = deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry);

let p2pService;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import { mockEpochProofQuote } from '@aztec/circuit-types';
import { type EpochProofQuote, mockEpochProofQuote } from '@aztec/circuit-types';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { type MockProxy, mock } from 'jest-mock-extended';

import { type PoolInstrumentation } from '../tx_pool/instrumentation.js';
import { MemoryEpochProofQuotePool } from './memory_epoch_proof_quote_pool.js';

describe('MemoryEpochProofQuotePool', () => {
let pool: MemoryEpochProofQuotePool;

let metricsMock: MockProxy<PoolInstrumentation<EpochProofQuote>>;

beforeEach(() => {
pool = new MemoryEpochProofQuotePool();
const telemetry = new NoopTelemetryClient();
pool = new MemoryEpochProofQuotePool(telemetry);

metricsMock = mock<PoolInstrumentation<EpochProofQuote>>();
(pool as any).metrics = metricsMock;
});

it('should add/get quotes to/from pool', () => {
const quote = mockEpochProofQuote(5n);

pool.addQuote(quote);

expect(metricsMock.recordAddedObjects).toHaveBeenCalledWith(1);

const quotes = pool.getQuotes(quote.payload.epochToProve);

expect(quotes).toHaveLength(1);
Expand All @@ -36,13 +48,16 @@ describe('MemoryEpochProofQuotePool', () => {

const quotes3 = pool.getQuotes(3n);
const quotesForEpoch3 = proofQuotes.filter(x => x.payload.epochToProve === 3n);
const quotesForEpoch2 = proofQuotes.filter(x => x.payload.epochToProve === 2n);

expect(quotes3).toHaveLength(quotesForEpoch3.length);
expect(quotes3).toEqual(quotesForEpoch3);

// should delete all quotes for epochs 2 and 3
pool.deleteQuotesToEpoch(3n);

expect(metricsMock.recordRemovedObjects).toHaveBeenCalledWith(quotesForEpoch2.length + quotesForEpoch3.length);

expect(pool.getQuotes(2n)).toHaveLength(0);
expect(pool.getQuotes(3n)).toHaveLength(0);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
import { type EpochProofQuote } from '@aztec/circuit-types';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { PoolInstrumentation } from '../tx_pool/instrumentation.js';
import { type EpochProofQuotePool } from './epoch_proof_quote_pool.js';

export class MemoryEpochProofQuotePool implements EpochProofQuotePool {
private quotes: Map<bigint, EpochProofQuote[]>;
constructor() {
private metrics: PoolInstrumentation<EpochProofQuote>;

constructor(telemetry: TelemetryClient) {
this.quotes = new Map();
this.metrics = new PoolInstrumentation(telemetry, 'MemoryEpochProofQuotePool');
}

addQuote(quote: EpochProofQuote) {
const epoch = quote.payload.epochToProve;
if (!this.quotes.has(epoch)) {
this.quotes.set(epoch, []);
}
this.quotes.get(epoch)!.push(quote);

this.metrics.recordAddedObjects(1);
}
getQuotes(epoch: bigint): EpochProofQuote[] {
return this.quotes.get(epoch) || [];
}
deleteQuotesToEpoch(epoch: bigint): void {
const expiredEpochs = Array.from(this.quotes.keys()).filter(k => k <= epoch);

let removedObjectsCount = 0;
for (const expiredEpoch of expiredEpochs) {
// For logging
removedObjectsCount += this.quotes.get(expiredEpoch)?.length || 0;

this.quotes.delete(expiredEpoch);
}

this.metrics.recordRemovedObjects(removedObjectsCount);
}
}
Loading
Loading