From 90ecbf5de87447657a36dfcd49a714b1b5105380 Mon Sep 17 00:00:00 2001 From: Anxo Rodriguez Date: Mon, 22 Jul 2024 10:22:22 +0100 Subject: [PATCH] Throttle block consumption (#156) # Description This PR adds a new optional config parameter to each network, allowing us to define some block throttling. the setting is called `processEveryNumBlocks` : * Throttle block processing to only process blocks every N blocks. Set to 1 to process every block, 2 to process every other block, etc. Also, note that even if we skip a block, it doesn't mean we don't have any work on it. We still need to check if there's new orders coming in. So what is actually skipped is only the programmatic order checks and posting orders. Also, it persists all Prometheus metrics, so it should count as a "consumed block" for our consumption rate alerts, meaning we should see the block consumption rate progressing faster. image This is an example on how to use it: `"processEveryNumBlocks": 40,` This would do polling and post orders once every 40 blocks. In arbitrum this should be every 8 seconds ```yaml { "networks": [ { "name": "arbitrum-one", "rpc": "wss://your-rpc/arbitrum", "deploymentBlock": 204704802, "processEveryNumBlocks": 40, "filterPolicy": { "defaultAction": "DROP", "handlers": { "0x44569Cbd4E10dd5e97293337964Eff32d58ed352": "ACCEPT", "0x519BA24e959E33b3B6220CA98bd353d8c2D89920": "ACCEPT", "0x6cF1e9cA41f7611dEf408122793c358a3d11E5a5": "ACCEPT", "0xd3338f21c89745e46af56aeaf553cf96ba9bc66f": "ACCEPT", "0xE8212F30C28B4AAB467DF3725C14d6e89C2eB967": "ACCEPT", "0xB148F40fff05b5CE6B22752cf8E454B556f7a851": "ACCEPT" } }, "watchdogTimeout": 120 } ] } ``` --- Dockerfile | 2 +- src/services/chain.ts | 59 ++++++++++++++++++++++-------------- src/types/config.schema.json | 6 ++++ src/types/types.d.ts | 4 +++ 4 files changed, 47 insertions(+), 24 deletions(-) diff --git a/Dockerfile b/Dockerfile index d1ba017..f6f2047 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:alpine AS build +FROM node:22.2-alpine AS build WORKDIR /usr/src/app diff --git a/src/services/chain.ts b/src/services/chain.ts index 95ed594..e431f67 100644 --- a/src/services/chain.ts +++ b/src/services/chain.ts @@ -77,6 +77,8 @@ export class ChainContext { readonly dryRun: boolean; readonly watchdogTimeout: number; readonly addresses?: string[]; + readonly processEveryNumBlocks: number; + private sync: ChainSync = ChainSync.SYNCING; static chains: Chains = {}; @@ -106,6 +108,7 @@ export class ChainContext { this.deploymentBlock = deploymentBlock; this.pageSize = pageSize ?? PAGE_SIZE_DEFAULT; this.dryRun = dryRun; + this.processEveryNumBlocks = options.processEveryNumBlocks ?? 1; this.watchdogTimeout = watchdogTimeout ?? WATCHDOG_TIMEOUT_DEFAULT_SECS; this.addresses = owners; @@ -168,7 +171,7 @@ export class ChainContext { * @returns the run promises for what needs to be watched */ public async warmUp(oneShot?: boolean) { - const { provider, chainId } = this; + const { provider, chainId, processEveryNumBlocks } = this; const log = getLogger("chainContext:warmUp", chainId.toString()); let { lastProcessedBlock } = this.registry; const { pageSize } = this; @@ -213,7 +216,13 @@ export class ChainContext { log.info( `🔄 Start sync with from block ${fromBlock} to ${toBlock}. Pending ${ toBlock - fromBlock - } blocks (~${Math.ceil((toBlock - fromBlock) / pageSize)} pages)` + } blocks (~${Math.ceil( + (toBlock - fromBlock) / pageSize + )} pages, processing every ${ + processEveryNumBlocks > 1 + ? processEveryNumBlocks + " blocks" + : "block" + })` ); } @@ -311,9 +320,8 @@ export class ChainContext { let lastBlockReceived = lastProcessedBlock; provider.on("block", async (blockNumber: number) => { try { - const block = await provider.getBlock(blockNumber); - log.debug(`New block ${blockNumber}`); + const block = await provider.getBlock(blockNumber); // Set the block time metric const _blockTime = block.timestamp - lastBlockReceived.timestamp; @@ -434,7 +442,7 @@ async function processBlock( blockNumberOverride?: number, blockTimestampOverride?: number ) { - const { provider, chainId } = context; + const { provider, chainId, processEveryNumBlocks } = context; const timer = metrics.processBlockDurationSeconds .labels(context.chainId.toString()) .startTimer(); @@ -463,24 +471,29 @@ async function processBlock( } } - // run action - const result = await checkForAndPlaceOrder( - context, - block, - blockNumberOverride, - blockTimestampOverride - ) - .then(() => true) - .catch(() => { - hasErrors = true; - log.error(`Error running "checkForAndPlaceOrder" action`); - return false; - }); - log.debug( - `Result of "checkForAndPlaceOrder" action for block ${ - block.number - }: ${_formatResult(result)}` - ); + // Decide if we should process this block + const shouldProcessBlock = block.number % processEveryNumBlocks === 0; + + // Check programmatic orders and place orders if necessary + if (shouldProcessBlock) { + const result = await checkForAndPlaceOrder( + context, + block, + blockNumberOverride, + blockTimestampOverride + ) + .then(() => true) + .catch(() => { + hasErrors = true; + log.error(`Error running "checkForAndPlaceOrder" action`); + return false; + }); + log.debug( + `Result of "checkForAndPlaceOrder" action for block ${ + block.number + }: ${_formatResult(result)}` + ); + } timer(); if (hasErrors) { diff --git a/src/types/config.schema.json b/src/types/config.schema.json index 0fa1078..94f5706 100644 --- a/src/types/config.schema.json +++ b/src/types/config.schema.json @@ -20,6 +20,12 @@ "watchdogTimeout": { "type": "integer" }, + "processEveryNumBlocks": { + "type": "integer", + "minimum": 1, + "description": "Throttle block processing to only process blocks every N blocks. Set to 1 to process every block (default), 2 to process every other block, etc.", + "default": 1 + }, "orderBookApi": { "type": "string", "format": "uri" diff --git a/src/types/types.d.ts b/src/types/types.d.ts index ad4b8d5..e7a35f0 100644 --- a/src/types/types.d.ts +++ b/src/types/types.d.ts @@ -13,6 +13,10 @@ export interface Config { rpc: string; deploymentBlock: number; watchdogTimeout?: number; + /** + * Throttle block processing to only process blocks every N blocks. Set to 1 to process every block, 2 to process every other block, etc. + */ + processEveryNumBlocks?: number; orderBookApi?: string; pageSize?: number; filterPolicy: {