Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: standardized peerdas metrics #7095

Draft
wants to merge 7 commits into
base: peerDAS
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ export function getBeaconBlockApi({
const fork = config.getForkName(signedBlock.message.slot);
let blockData: BlockInputData;
if (fork === ForkName.peerdas) {
const timer = chain.metrics?.peerDas.sidecarComputationTimeInSec.startTimer();
dataColumnSidecars = computeDataColumnSidecars(config, signedBlock, signedBlockOrContents);
timer?.();
blockData = {
fork,
dataColumnsLen: dataColumnSidecars.length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export async function verifyBlocksDataAvailability(
}

async function maybeValidateBlobs(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null},
blockInput: BlockInput,
opts: ImportBlockOpts
): Promise<{dataAvailabilityStatus: DataAvailabilityStatus; availableBlockInput: BlockInput}> {
Expand Down Expand Up @@ -119,7 +119,9 @@ async function maybeValidateBlobs(
const {dataColumns} = blockData;
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
// might require numColumns, custodyColumns from blockData as input to below
const timer = chain.metrics?.peerDas.batchColumnVerificationTimeInSec.startTimer();
validateDataColumnsSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, dataColumns, {skipProofsCheck});
timer?.();
}

const availableBlockInput = getBlockInput.availableData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ export async function validateGossipDataColumnSidecar(
});
}

if (!validateInclusionProof(dataColumnSideCar)) {
const timer = chain.metrics?.peerDas.inclusionProofVerificationTimeInSec.startTimer();
const isValid = validateInclusionProof(dataColumnSideCar);
timer?.();

if (!isValid) {
throw new DataColumnSidecarGossipError(GossipAction.REJECT, {
code: DataColumnSidecarErrorCode.INCLUSION_PROOF_INVALID,
slot: dataColumnSideCar.signedBlockHeader.message.slot,
Expand Down
67 changes: 67 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ export function createLodestarMetrics(
labelNames: ["error"],
}),
},

gossipBlob: {
recvToValidation: register.histogram({
name: "lodestar_gossip_blob_received_to_gossip_validate",
Expand All @@ -777,6 +778,72 @@ export function createLodestarMetrics(
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
},

gossipColumn: {
recvToValidation: register.histogram({
name: "beacon_data_column_sidecar_gossip_received_until_validation_seconds",
help: "Time elapsed between blob received and blob validation",
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
/**
* All of these column metrics that are prefixed with `beacon_` are part of the
* ethpandops official metrics set
* https://github.com/KatyaRyazantseva/beacon-metrics/blob/master/metrics.md#peerdas-metrics
*/
validationTimeInSec: register.histogram({
name: "beacon_data_column_sidecar_gossip_verification_seconds",
help: "Full runtime of data column sidecars gossip verification",
// TODO: (@g11tech) need to verify that these buckets are correct. they are copy/pasta from above
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
processRequestCount: register.gauge({
name: "beacon_data_column_sidecar_processing_requests_total",
help: "Number of data column sidecars submitted for processing",
}),
processSuccessCount: register.gauge({
name: "beacon_data_column_sidecar_processing_successes_total",
help: "Number of data column sidecars verified for gossip",
}),
},

peerDas: {
/**
* All of these column metrics that are prefixed with `beacon_` are part of the
* ethpandops official metrics set
* https://github.com/KatyaRyazantseva/beacon-metrics/blob/master/metrics.md#peerdas-metrics
*/
matrixReconstructionTimeInSec: register.histogram({
name: "beacon_data_availability_reconstruction_time_seconds",
help: "Time taken to reconstruct columns",
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
sidecarComputationTimeInSec: register.histogram({
name: "beacon_data_column_sidecar_computation_seconds",
help: "Time taken to compute data column sidecar, including cells, proofs and inclusion proof",
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
inclusionProofVerificationTimeInSec: register.histogram({
name: "beacon_data_column_sidecar_inclusion_proof_verification_seconds",
help: "Time taken to verify data column sidecar inclusion proof",
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
// commenting this out for now. there is no where we do single verification currently
// singleColumnVerificationTimeInSec: register.histogram({
// name: "beacon_kzg_verification_data_column_single_seconds",
// help: "Runtime of single data column kzg verification",
// buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
// }),
batchColumnVerificationTimeInSec: register.histogram({
name: "beacon_kzg_verification_data_column_batch_seconds",
help: "Runtime of batched data column kzg verification",
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
totalCustodyColumnCount: register.gauge({
name: "beacon_custody_columns_count_total",
help: "Total count of columns in custody within the data availability boundary",
}),
},

importBlock: {
persistBlockNoSerializedDataCount: register.gauge({
name: "lodestar_import_block_persist_block_no_serialized_data_count",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,12 +282,14 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
);

try {
metrics?.gossipColumn.processRequestCount.inc();
await validateGossipDataColumnSidecar(chain, dataColumnSidecar, gossipIndex);
metrics?.gossipColumn.processSuccessCount.inc();
const recvToValidation = Date.now() / 1000 - seenTimestampSec;
const validationTime = recvToValidation - recvToValLatency;

metrics?.gossipBlob.recvToValidation.observe(recvToValidation);
metrics?.gossipBlob.validationTime.observe(validationTime);
metrics?.gossipColumn.recvToValidation.observe(recvToValidation);
metrics?.gossipColumn.validationTimeInSec.observe(validationTime);

logger.debug("Received gossip dataColumn", {
slot: slot,
Expand Down Expand Up @@ -650,6 +652,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler

chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate);
},

[GossipType.beacon_attestation]: async ({
gossipData,
topic,
Expand Down
5 changes: 5 additions & 0 deletions packages/reqresp/src/encoders/responseDecode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
ResponseIncoming,
} from "../types.js";
import {RespStatus} from "../interface.js";
import {Metrics} from "../metrics.js";

/**
* Internal helper type to signal stream ended early
Expand All @@ -29,6 +30,8 @@ enum StreamStatus {
*/
export function responseDecode(
protocol: MixedProtocol,
protocolID: string,
metrics: Metrics | null,
cbs: {
onFirstHeader: () => void;
onFirstResponseChunk: () => void;
Expand Down Expand Up @@ -65,6 +68,8 @@ export function responseDecode(
const forkName = await readContextBytes(protocol.contextBytes, bufferedSource);
const typeSizes = protocol.responseSizes(forkName);
const chunkData = await readEncodedPayload(bufferedSource, protocol.encoding, typeSizes);
// eslint-disable-next-line @typescript-eslint/naming-convention
metrics?.responsesReceivedBytesTotalCount.inc({protocol_id: protocolID}, chunkData.length);

yield {
data: chunkData,
Expand Down
4 changes: 2 additions & 2 deletions packages/reqresp/src/encoders/responseEncode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ const SUCCESS_BUFFER = Buffer.from([RespStatus.SUCCESS]);
*/
export function responseEncodeSuccess(
protocol: Protocol,
cbs: {onChunk: (chunkIndex: number) => void}
cbs: {onChunk: (chunkIndex: number, chunkLength: number) => void}
): (source: AsyncIterable<ResponseOutgoing>) => AsyncIterable<Buffer> {
return async function* responseEncodeSuccessTransform(source) {
let chunkIndex = 0;

for await (const chunk of source) {
// Postfix increment, return 0 as first chunk
cbs.onChunk(chunkIndex++);
cbs.onChunk(chunkIndex++, chunk.data.length);

// <result>
yield SUCCESS_BUFFER;
Expand Down
50 changes: 50 additions & 0 deletions packages/reqresp/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/naming-convention */
import {MetricsRegister} from "@lodestar/utils";

export type Metrics = ReturnType<typeof getMetrics>;
Expand All @@ -10,6 +11,55 @@ export function getMetrics(register: MetricsRegister) {
// Using function style instead of class to prevent having to re-declare all MetricsPrometheus types.

return {
requestsSentTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_requests_sent_total",
help: "Number of requests sent",
labelNames: ["protocol_id"],
}),
requestsSentBytesTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_requests_bytes_sent_total",
help: "Number of requests bytes sent",
labelNames: ["protocol_id"],
}),
requestsReceivedTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_requests_received_total",
help: "Number of requests received",
labelNames: ["protocol_id"],
}),
requestsReceivedBytesTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_requests_bytes_received_total",
help: "Number of requests bytes received",
labelNames: ["protocol_id"],
}),
responsesSentTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_responses_sent_total",
help: "Number of responses sent",
labelNames: ["protocol_id"],
}),
responsesSentBytesTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_responses_bytes_sent_total",
help: "Number of responses bytes sent",
labelNames: ["protocol_id"],
}),
responsesReceivedTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_responses_received_total",
help: "Number of responses received",
labelNames: ["protocol_id"],
}),
responsesReceivedBytesTotalCount: register.counter<{protocol_id: string}>({
// ethereum/beacon-metrics defined
name: "libp2p_rpc_responses_bytes_received_total",
help: "Number of responses bytes received",
labelNames: ["protocol_id"],
}),

outgoingRequests: register.gauge<{method: string}>({
name: "beacon_reqresp_outgoing_requests_total",
help: "Counts total requests done per method",
Expand Down
9 changes: 8 additions & 1 deletion packages/reqresp/src/request/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ export async function* sendRequest(
}
);

// only count successful request transmissions and ignore count and body sent size on errors
/* eslint-disable @typescript-eslint/naming-convention */
metrics?.requestsSentTotalCount.inc({protocol_id: protocolId});
metrics?.requestsSentBytesTotalCount.inc({protocol_id: protocolId}, requestBody.length);
/* eslint-enable @typescript-eslint/naming-convention */
logger.debug("Req request sent", logCtx);

// For goodbye method peers may disconnect before completing the response and trigger multiple errors.
Expand Down Expand Up @@ -178,7 +183,7 @@ export async function* sendRequest(
]),

// Transforms `Buffer` chunks to yield `ResponseBody` chunks
responseDecode(protocol, {
responseDecode(protocol, protocolId, metrics, {
onFirstHeader() {
// On first byte, cancel the single use TTFB_TIMEOUT, and start RESP_TIMEOUT
clearTimeout(timeoutTTFB);
Expand All @@ -196,6 +201,8 @@ export async function* sendRequest(
// NOTE: Do not log the response, logs get extremely cluttered
// NOTE: add double space after "Req " to align log with the "Resp " log
logger.verbose("Req done", {...logCtx});
// eslint-disable-next-line @typescript-eslint/naming-convention
metrics?.responsesReceivedTotalCount.inc({protocol_id: protocolId});
} finally {
clearTimeout(timeoutTTFB);
if (timeoutRESP !== null) clearTimeout(timeoutRESP);
Expand Down
12 changes: 11 additions & 1 deletion packages/reqresp/src/response/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ export async function handleRequest({
throw new RequestError({code: RequestErrorCode.REQUEST_RATE_LIMITED});
}

/* eslint-disable @typescript-eslint/naming-convention */
metrics?.requestsReceivedBytesTotalCount.inc({protocol_id: protocolID}, requestBody.length);
metrics?.requestsReceivedTotalCount.inc({protocol_id: protocolID});
/* eslint-enable @typescript-eslint/naming-convention */

const requestChunk: ReqRespRequest = {
data: requestBody,
version: protocol.version,
Expand All @@ -103,8 +108,10 @@ export async function handleRequest({
// Note: Not logging on each chunk since after 1 year it hasn't add any value when debugging
// onChunk(() => logger.debug("Resp sending chunk", logCtx)),
responseEncodeSuccess(protocol, {
onChunk(chunkIndex) {
onChunk(chunkIndex, chunkLength) {
if (chunkIndex === 0) timerTTFB?.();
// eslint-disable-next-line @typescript-eslint/naming-convention
metrics?.responsesSentBytesTotalCount.inc({protocol_id: protocolID}, chunkLength);
},
})
);
Expand Down Expand Up @@ -133,9 +140,12 @@ export async function handleRequest({

if (responseError !== null) {
logger.verbose("Resp error", logCtx, responseError);
// TODO: (@matthewkeil) should be have a metrics?.responseSentTotalErrorCount.inc()?
throw responseError;
} else {
// NOTE: Only log once per request to verbose, intermediate steps to debug
logger.verbose("Resp done", logCtx);
// eslint-disable-next-line @typescript-eslint/naming-convention
metrics?.responsesSentTotalCount.inc({protocol_id: protocolID});
}
}