From 10481d0fea649af2714049254d51d00c85e20597 Mon Sep 17 00:00:00 2001 From: Nitin Mittal Date: Mon, 11 Sep 2023 21:24:32 +0400 Subject: [PATCH] fix: producer queue memory heap issue --- internal/block_getters/erigon_block_getter.ts | 2 +- .../block_getters/quicknode_block_getter.ts | 26 ++++++-- .../quicknode_block_getter_worker.ts | 19 +++++- internal/block_producers/block_producer.ts | 14 +++-- .../abstract_block_subscription.ts | 9 ++- .../block_subscription/block_subscription.ts | 21 ++++++- internal/interfaces/block_producer_config.ts | 2 + internal/queue/queue.ts | 61 ++++++++++++++----- .../quicknode_block_producer.ts | 12 +++- tests/block_producer/block_producer.test.ts | 4 +- .../quicknode_block_producer.test.ts | 4 +- 11 files changed, 137 insertions(+), 37 deletions(-) diff --git a/internal/block_getters/erigon_block_getter.ts b/internal/block_getters/erigon_block_getter.ts index f00ff7f..c43c03b 100644 --- a/internal/block_getters/erigon_block_getter.ts +++ b/internal/block_getters/erigon_block_getter.ts @@ -38,7 +38,7 @@ export class ErigonBlockGetter extends BlockGetter implements IBlockGetter { for (const transactionObject of result[0].transactions) { transactions.push(this.formatTransactionObject( transactionObject as IWeb3Transaction, - this.formatRawReceipt(result[1].find( + this.formatRawReceipt(result[1]?.find( (receipt) => receipt.transactionHash === transactionObject.hash )) ?? await this.getTransactionReceipt(transactionObject.hash) diff --git a/internal/block_getters/quicknode_block_getter.ts b/internal/block_getters/quicknode_block_getter.ts index d1405d5..a06aa43 100644 --- a/internal/block_getters/quicknode_block_getter.ts +++ b/internal/block_getters/quicknode_block_getter.ts @@ -5,6 +5,7 @@ import { ITransaction } from "../interfaces/transaction.js"; import { IBlock } from "../interfaces/block.js"; import { IBlockGetter } from "../interfaces/block_getter.js"; import { BlockGetter } from "./block_getter.js"; +import { Eth } from "web3-eth"; /** * A wrapper class on web3 to get blocks from quicknode and format them. @@ -13,6 +14,16 @@ import { BlockGetter } from "./block_getter.js"; */ export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter { + /** + * @param {Eth} eth - Eth module from web3.js + * @param {number} maxRetries - The number of times to retry on errors. + * + * @constructor + */ + constructor(eth: Eth, maxRetries: number = 0, private alternateEth?: Eth) { + super(eth, maxRetries); + } + /** * @async * Public method to query block data including transaction receipts of a single block. @@ -28,12 +39,17 @@ export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter { const response: IQuickNodeResponse = await new Promise((resolve, reject) => { const timeout = setTimeout(() => { reject(new Error(`Request timed out for block: ${blockNumber}`)); - }, 45000); - - (this.eth.currentProvider as WebsocketProvider).send({ + }, 4000); + + let eth: Eth = this.eth; + if (retryCount > 0 && this.alternateEth) { + eth = this.alternateEth; + } + + (eth.currentProvider as WebsocketProvider).send({ method: "qn_getBlockWithReceipts", id: Date.now().toString() + blockNumber, - params: [ utils.numberToHex(blockNumber) ], + params: [utils.numberToHex(blockNumber)], jsonrpc: "2.0" }, (error, response) => { if (error) { @@ -57,7 +73,7 @@ export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter { await this.getTransactionReceipt(transactionObject.hash) )); } - + return this.formatRawBlock( response.block, transactions diff --git a/internal/block_getters/quicknode_block_getter_worker.ts b/internal/block_getters/quicknode_block_getter_worker.ts index 6f23612..c3122d6 100644 --- a/internal/block_getters/quicknode_block_getter_worker.ts +++ b/internal/block_getters/quicknode_block_getter_worker.ts @@ -25,7 +25,24 @@ const blockGetter = new QuickNodeBlockGetter( } ) ), - workerData.maxRetries + workerData.maxRetries, + //@ts-ignore + new EthClass( + //@ts-ignore + new EthClass.providers.WebsocketProvider( + workerData.alternateEndpoint, + { + reconnect: { + auto: true + }, + clientConfig: { + maxReceivedFrameSize: 1000000000, + maxReceivedMessageSize: 1000000000, + }, + timeout: 45000 + } + ) + ), ); parentPort.on("message", async (message: { diff --git a/internal/block_producers/block_producer.ts b/internal/block_producers/block_producer.ts index 742db4c..80be96b 100644 --- a/internal/block_producers/block_producer.ts +++ b/internal/block_producers/block_producer.ts @@ -260,10 +260,16 @@ export class BlockProducer extends AsynchronousProducer { try { while (!this.mongoInsertQueue.isEmpty()) { - await this.addBlockToMongo( - this.mongoInsertQueue.front() as IProducedBlock - ); - this.mongoInsertQueue.shift(); + const queueLength = this.mongoInsertQueue.getLength(); + if (queueLength > this.maxReOrgDepth) { + await this.addBlockToMongo( + this.mongoInsertQueue.shiftByN(queueLength - this.maxReOrgDepth) as IProducedBlock + ) + } else { + await this.addBlockToMongo( + this.mongoInsertQueue.shift() as IProducedBlock + ) + } } } catch (error) { Logger.error(error as object); diff --git a/internal/block_subscription/abstract_block_subscription.ts b/internal/block_subscription/abstract_block_subscription.ts index 2a5f5e3..25ef980 100644 --- a/internal/block_subscription/abstract_block_subscription.ts +++ b/internal/block_subscription/abstract_block_subscription.ts @@ -34,7 +34,7 @@ export abstract class AbstractBlockSubscription extends Queue, startBlock: number): Promise { try { + + this.lastFinalizedBlock = this.blockDelay > 0 + ? (await this.eth.getBlock('latest')).number - this.blockDelay + : (await this.eth.getBlock('finalized')).number; //Clear any previously existing queue this.clear(); this.observer = observer; this.fatalError = false; - this.lastFinalizedBlock = (await this.eth.getBlock("finalized")).number; this.nextBlock = startBlock; this.lastBlockHash = ""; this.lastReceivedBlockNumber = startBlock - 1; diff --git a/internal/block_subscription/block_subscription.ts b/internal/block_subscription/block_subscription.ts index 97cdfcb..fb8fa84 100644 --- a/internal/block_subscription/block_subscription.ts +++ b/internal/block_subscription/block_subscription.ts @@ -30,9 +30,11 @@ export class BlockSubscription extends AbstractBlockSubscription { protected rpcWsEndpoints: string[] = [], protected maxRetries: number = 0, private blockGetterType: "quicknode_block_getter" | "erigon_block_getter" | "block_getter" = "block_getter", - timeout?: number + timeout?: number, + blockDelay?: number, + protected alternateEndpoint?: string ) { - super(eth, timeout); + super(eth, timeout, blockDelay); this.setWorkers(); } @@ -45,6 +47,7 @@ export class BlockSubscription extends AbstractBlockSubscription { private setWorkers(): void { const workers: Worker[] = []; const workerPath: string = createRequire(import.meta.url).resolve(`../block_getters/${this.blockGetterType}_worker`); + if (!this.rpcWsEndpoints.length) { //TODO - throw error if no rpc return; @@ -53,7 +56,8 @@ export class BlockSubscription extends AbstractBlockSubscription { for (let i = 0; i < this.rpcWsEndpoints.length; i++) { const workerData = { endpoint: this.rpcWsEndpoints[i], - maxRetries: this.maxRetries + maxRetries: this.maxRetries, + alternateEndpoint: this.alternateEndpoint ? this.alternateEndpoint : undefined }; const worker = new Worker( @@ -208,6 +212,17 @@ export class BlockSubscription extends AbstractBlockSubscription { blockPromise ); + // this part limit the queue length to 2500 and keep on waiting 5 seconds if + // the length is more than 2500 + if (this.getLength() >= 2500) { + for (let i = 0; i < 1;) { + await new Promise(r => setTimeout(r, 5000)); + if (this.getLength() < 2500) { + break; + } + } + } + try { await blockPromise; } catch { diff --git a/internal/interfaces/block_producer_config.ts b/internal/interfaces/block_producer_config.ts index e625c50..81a6ef7 100644 --- a/internal/interfaces/block_producer_config.ts +++ b/internal/interfaces/block_producer_config.ts @@ -8,4 +8,6 @@ export interface IBlockProducerConfig extends IProducerConfig { maxRetries?: number, blockPollingTimeout?: number, blockSubscriptionTimeout?: number, + blockDelay?: number, + alternateEndpoint?: string } diff --git a/internal/queue/queue.ts b/internal/queue/queue.ts index 4c6a4f1..11b0c02 100644 --- a/internal/queue/queue.ts +++ b/internal/queue/queue.ts @@ -2,8 +2,10 @@ * A simple class that offers methods to create internal buffers and maintain them. */ export class Queue { - private items: Array | T> = []; - + private items: Record | T> = {}; + private head: number = 0; + private tail: number = 0; + /** * Public method to add an item to the queue. This class only maintains one queue and hence it is important to be careful * while queueing different entities on the same instance un intentionally. @@ -13,9 +15,10 @@ export class Queue { * @returns {void} */ public enqueue(item: Promise | T): void { - this.items.push(item); + this.items[this.tail] = item; + this.tail++; } - + /** * Returns an item from the beginning(added first) of the queue and removes it from the queue. Returns null if the queue is empty. * @@ -26,9 +29,38 @@ export class Queue { return null; } - return this.items.shift() as Promise | T; + const item = this.items[this.head]; + delete this.items[this.head]; + this.head++; + + return item as Promise | T; + } + + /** + * Returns an item from the beginning(added first) + nth - 1 of the queue and removes it along + * with all other in front of it from the queue. Returns null if the queue is empty or less than the position. + * + * @returns {Promise | T | null} - The removed the item. + */ + public shiftByN(position: number): Promise | T | null { + if (this.getLength() < position && position !== 0) { + return null; + } + + this.head = this.head + position - 1; + const item = this.items[this.head]; + for (const key in Object.keys(this.items)) { + const numericKey = parseInt(key); + + if (numericKey <= this.head) { + delete this.items[key]; + } + } + this.head++; + + return item as Promise | T; } - + /** * Returns the first item from the queue without removing it. * @@ -38,10 +70,10 @@ export class Queue { if (this.isEmpty()) { return null; } - - return this.items[0]; + + return this.items[this.head]; } - + /** * Method to check if the queue is empty * @@ -50,23 +82,24 @@ export class Queue { public isEmpty(): boolean { return this.getLength() == 0; } - + /** * Method to find the length of the queue. * * @returns {number} - The length of the queue. */ public getLength(): number { - return this.items.length; + return this.tail - this.head; } - + /** * Removes all the items from the queue. * * @returns {number} - The queue length after clearing. */ public clear(): number { - this.items.length = 0; - return this.items.length; + this.head = this.tail = 0; + this.items = {}; + return 0; } } diff --git a/public/block_producers/quicknode_block_producer.ts b/public/block_producers/quicknode_block_producer.ts index 138d415..77155ca 100644 --- a/public/block_producers/quicknode_block_producer.ts +++ b/public/block_producers/quicknode_block_producer.ts @@ -14,7 +14,7 @@ import Eth from "web3-eth"; * */ export class QuickNodeBlockProducer extends BlockProducer { - + /** * @constructor * @@ -30,6 +30,8 @@ export class QuickNodeBlockProducer extends BlockProducer { const maxReOrgDepth = config.maxReOrgDepth || 0; const maxRetries = config.maxRetries || 0; const blockSubscriptionTimeout = config.blockSubscriptionTimeout; + const blockDelay = config.blockDelay || 0; + const alternateEndpoint = config.alternateEndpoint; // Has to be done or Kafka complains later delete config.rpcWsEndpoints; @@ -38,6 +40,8 @@ export class QuickNodeBlockProducer extends BlockProducer { delete config.maxReOrgDepth; delete config.maxRetries; delete config.blockSubscriptionTimeout; + delete config.blockDelay; + delete config.alternateEndpoint; //@ts-ignore const eth = new Eth( @@ -67,7 +71,9 @@ export class QuickNodeBlockProducer extends BlockProducer { endpoints, maxRetries, "quicknode_block_getter", - blockSubscriptionTimeout + blockSubscriptionTimeout, + blockDelay, + alternateEndpoint ), new BlockGetter(eth, maxRetries), database, @@ -79,6 +85,6 @@ export class QuickNodeBlockProducer extends BlockProducer { startBlock, maxReOrgDepth ); - + } } diff --git a/tests/block_producer/block_producer.test.ts b/tests/block_producer/block_producer.test.ts index 24959a4..d85eb90 100644 --- a/tests/block_producer/block_producer.test.ts +++ b/tests/block_producer/block_producer.test.ts @@ -462,11 +462,11 @@ describe("Block Producer", () => { return; }); - mockedQueueObject.front.mockReturnValueOnce({ + mockedQueueObject.shift.mockReturnValueOnce({ number: 15400000, hash: "0x19823dbf42b70e95e552b48e3df646d2f41b510e20b8ee1878acb18eccbefb07" }); - mockedQueueObject.front.mockReturnValueOnce({ + mockedQueueObject.shift.mockReturnValueOnce({ number: 15400001, hash: "0x19823dbf42b70e95e552b48e3df646d2f41b510e20b8ee1878acb18eccbefb07" }); diff --git a/tests/block_producer/quicknode_block_producer.test.ts b/tests/block_producer/quicknode_block_producer.test.ts index cfcd4f2..49653b8 100644 --- a/tests/block_producer/quicknode_block_producer.test.ts +++ b/tests/block_producer/quicknode_block_producer.test.ts @@ -139,7 +139,9 @@ describe("Block Producer", () => { ["rpc.com", "rpc2.com"], 0, "quicknode_block_getter", - 60000 + 60000, + 0, + undefined ); });