Skip to content

Commit

Permalink
fix: producer queue memory heap issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nitinmittal23 committed Sep 11, 2023
1 parent abf527f commit 10481d0
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 37 deletions.
2 changes: 1 addition & 1 deletion internal/block_getters/erigon_block_getter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class ErigonBlockGetter extends BlockGetter implements IBlockGetter {
for (const transactionObject of result[0].transactions) {
transactions.push(this.formatTransactionObject(
transactionObject as IWeb3Transaction,
this.formatRawReceipt(result[1].find(
this.formatRawReceipt(result[1]?.find(
(receipt) => receipt.transactionHash === transactionObject.hash
)) ??
await this.getTransactionReceipt(transactionObject.hash)
Expand Down
26 changes: 21 additions & 5 deletions internal/block_getters/quicknode_block_getter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ITransaction } from "../interfaces/transaction.js";
import { IBlock } from "../interfaces/block.js";
import { IBlockGetter } from "../interfaces/block_getter.js";
import { BlockGetter } from "./block_getter.js";
import { Eth } from "web3-eth";

/**
* A wrapper class on web3 to get blocks from quicknode and format them.
Expand All @@ -13,6 +14,16 @@ import { BlockGetter } from "./block_getter.js";
*/
export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter {

/**
* @param {Eth} eth - Eth module from web3.js
* @param {number} maxRetries - The number of times to retry on errors.
*
* @constructor
*/
constructor(eth: Eth, maxRetries: number = 0, private alternateEth?: Eth) {
super(eth, maxRetries);
}

/**
* @async
* Public method to query block data including transaction receipts of a single block.
Expand All @@ -28,12 +39,17 @@ export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter {
const response: IQuickNodeResponse = await new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Request timed out for block: ${blockNumber}`));
}, 45000);

(this.eth.currentProvider as WebsocketProvider).send({
}, 4000);

let eth: Eth = this.eth;
if (retryCount > 0 && this.alternateEth) {
eth = this.alternateEth;
}

(eth.currentProvider as WebsocketProvider).send({
method: "qn_getBlockWithReceipts",
id: Date.now().toString() + blockNumber,
params: [ utils.numberToHex(blockNumber) ],
params: [utils.numberToHex(blockNumber)],
jsonrpc: "2.0"
}, (error, response) => {
if (error) {
Expand All @@ -57,7 +73,7 @@ export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter {
await this.getTransactionReceipt(transactionObject.hash)
));
}

return this.formatRawBlock(
response.block,
transactions
Expand Down
19 changes: 18 additions & 1 deletion internal/block_getters/quicknode_block_getter_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,24 @@ const blockGetter = new QuickNodeBlockGetter(
}
)
),
workerData.maxRetries
workerData.maxRetries,
//@ts-ignore
new EthClass(
//@ts-ignore
new EthClass.providers.WebsocketProvider(
workerData.alternateEndpoint,
{
reconnect: {
auto: true
},
clientConfig: {
maxReceivedFrameSize: 1000000000,
maxReceivedMessageSize: 1000000000,
},
timeout: 45000
}
)
),
);

parentPort.on("message", async (message: {
Expand Down
14 changes: 10 additions & 4 deletions internal/block_producers/block_producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,16 @@ export class BlockProducer extends AsynchronousProducer {

try {
while (!this.mongoInsertQueue.isEmpty()) {
await this.addBlockToMongo(
this.mongoInsertQueue.front() as IProducedBlock
);
this.mongoInsertQueue.shift();
const queueLength = this.mongoInsertQueue.getLength();
if (queueLength > this.maxReOrgDepth) {
await this.addBlockToMongo(
this.mongoInsertQueue.shiftByN(queueLength - this.maxReOrgDepth) as IProducedBlock
)

Check failure on line 267 in internal/block_producers/block_producer.ts

View workflow job for this annotation

GitHub Actions / build

Missing semicolon
} else {
await this.addBlockToMongo(
this.mongoInsertQueue.shift() as IProducedBlock
)

Check failure on line 271 in internal/block_producers/block_producer.ts

View workflow job for this annotation

GitHub Actions / build

Missing semicolon
}
}
} catch (error) {
Logger.error(error as object);
Expand Down
9 changes: 6 additions & 3 deletions internal/block_subscription/abstract_block_subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ export abstract class AbstractBlockSubscription extends Queue<IBlockGetterWorker
private lastEmittedBlock?: {
number: number,
hash: string
};
}

Check failure on line 37 in internal/block_subscription/abstract_block_subscription.ts

View workflow job for this annotation

GitHub Actions / build

Missing semicolon

/**
* @constructor
*
* @param {Eth} eth - Eth module from web3.js
* @param {number} timeout - Timeout for which if there has been no event, connection must be restarted.
*/
constructor(private eth: Eth, private timeout: number = 60000) {
constructor(private eth: Eth, private timeout: number = 60000, private blockDelay: number = 0) {
super();
}

Expand All @@ -59,11 +59,14 @@ export abstract class AbstractBlockSubscription extends Queue<IBlockGetterWorker
*/
public async subscribe(observer: IObserver<IBlock, BlockProducerError>, startBlock: number): Promise<void> {
try {

this.lastFinalizedBlock = this.blockDelay > 0
? (await this.eth.getBlock('latest')).number - this.blockDelay

Check failure on line 64 in internal/block_subscription/abstract_block_subscription.ts

View workflow job for this annotation

GitHub Actions / build

Strings must use doublequote
: (await this.eth.getBlock('finalized')).number;

Check failure on line 65 in internal/block_subscription/abstract_block_subscription.ts

View workflow job for this annotation

GitHub Actions / build

Strings must use doublequote
//Clear any previously existing queue
this.clear();
this.observer = observer;
this.fatalError = false;
this.lastFinalizedBlock = (await this.eth.getBlock("finalized")).number;
this.nextBlock = startBlock;
this.lastBlockHash = "";
this.lastReceivedBlockNumber = startBlock - 1;
Expand Down
21 changes: 18 additions & 3 deletions internal/block_subscription/block_subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ export class BlockSubscription extends AbstractBlockSubscription {
protected rpcWsEndpoints: string[] = [],
protected maxRetries: number = 0,
private blockGetterType: "quicknode_block_getter" | "erigon_block_getter" | "block_getter" = "block_getter",
timeout?: number
timeout?: number,
blockDelay?: number,
protected alternateEndpoint?: string
) {
super(eth, timeout);
super(eth, timeout, blockDelay);

this.setWorkers();
}
Expand All @@ -45,6 +47,7 @@ export class BlockSubscription extends AbstractBlockSubscription {
private setWorkers(): void {
const workers: Worker[] = [];
const workerPath: string = createRequire(import.meta.url).resolve(`../block_getters/${this.blockGetterType}_worker`);

if (!this.rpcWsEndpoints.length) {
//TODO - throw error if no rpc
return;
Expand All @@ -53,7 +56,8 @@ export class BlockSubscription extends AbstractBlockSubscription {
for (let i = 0; i < this.rpcWsEndpoints.length; i++) {
const workerData = {
endpoint: this.rpcWsEndpoints[i],
maxRetries: this.maxRetries
maxRetries: this.maxRetries,
alternateEndpoint: this.alternateEndpoint ? this.alternateEndpoint : undefined
};

const worker = new Worker(
Expand Down Expand Up @@ -208,6 +212,17 @@ export class BlockSubscription extends AbstractBlockSubscription {
blockPromise
);

// this part limit the queue length to 2500 and keep on waiting 5 seconds if
// the length is more than 2500
if (this.getLength() >= 2500) {
for (let i = 0; i < 1;) {
await new Promise(r => setTimeout(r, 5000));
if (this.getLength() < 2500) {
break;
}
}
}

try {
await blockPromise;
} catch {
Expand Down
2 changes: 2 additions & 0 deletions internal/interfaces/block_producer_config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ export interface IBlockProducerConfig extends IProducerConfig {
maxRetries?: number,
blockPollingTimeout?: number,
blockSubscriptionTimeout?: number,
blockDelay?: number,
alternateEndpoint?: string
}
61 changes: 47 additions & 14 deletions internal/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
* A simple class that offers methods to create internal buffers and maintain them.
*/
export class Queue<T> {
private items: Array<Promise<T> | T> = [];

private items: Record<number, Promise<T> | T> = {};
private head: number = 0;
private tail: number = 0;

/**
* Public method to add an item to the queue. This class only maintains one queue and hence it is important to be careful
* while queueing different entities on the same instance un intentionally.
Expand All @@ -13,9 +15,10 @@ export class Queue<T> {
* @returns {void}
*/
public enqueue(item: Promise<T> | T): void {
this.items.push(item);
this.items[this.tail] = item;
this.tail++;
}

/**
* Returns an item from the beginning(added first) of the queue and removes it from the queue. Returns null if the queue is empty.
*
Expand All @@ -26,9 +29,38 @@ export class Queue<T> {
return null;
}

return this.items.shift() as Promise<T> | T;
const item = this.items[this.head];
delete this.items[this.head];
this.head++;

return item as Promise<T> | T;
}

/**
* Returns an item from the beginning(added first) + nth - 1 of the queue and removes it along
* with all other in front of it from the queue. Returns null if the queue is empty or less than the position.
*
* @returns {Promise<T> | T | null} - The removed the item.
*/
public shiftByN(position: number): Promise<T> | T | null {
if (this.getLength() < position && position !== 0) {
return null;
}

this.head = this.head + position - 1;
const item = this.items[this.head];
for (const key in Object.keys(this.items)) {
const numericKey = parseInt(key);

if (numericKey <= this.head) {
delete this.items[key];
}
}
this.head++;

return item as Promise<T> | T;
}

/**
* Returns the first item from the queue without removing it.
*
Expand All @@ -38,10 +70,10 @@ export class Queue<T> {
if (this.isEmpty()) {
return null;
}
return this.items[0];

return this.items[this.head];
}

/**
* Method to check if the queue is empty
*
Expand All @@ -50,23 +82,24 @@ export class Queue<T> {
public isEmpty(): boolean {
return this.getLength() == 0;
}

/**
* Method to find the length of the queue.
*
* @returns {number} - The length of the queue.
*/
public getLength(): number {
return this.items.length;
return this.tail - this.head;
}

/**
* Removes all the items from the queue.
*
* @returns {number} - The queue length after clearing.
*/
public clear(): number {
this.items.length = 0;
return this.items.length;
this.head = this.tail = 0;
this.items = {};
return 0;
}
}
12 changes: 9 additions & 3 deletions public/block_producers/quicknode_block_producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import Eth from "web3-eth";
*
*/
export class QuickNodeBlockProducer extends BlockProducer {

/**
* @constructor
*
Expand All @@ -30,6 +30,8 @@ export class QuickNodeBlockProducer extends BlockProducer {
const maxReOrgDepth = config.maxReOrgDepth || 0;
const maxRetries = config.maxRetries || 0;
const blockSubscriptionTimeout = config.blockSubscriptionTimeout;
const blockDelay = config.blockDelay || 0;
const alternateEndpoint = config.alternateEndpoint;

// Has to be done or Kafka complains later
delete config.rpcWsEndpoints;
Expand All @@ -38,6 +40,8 @@ export class QuickNodeBlockProducer extends BlockProducer {
delete config.maxReOrgDepth;
delete config.maxRetries;
delete config.blockSubscriptionTimeout;
delete config.blockDelay;
delete config.alternateEndpoint;

//@ts-ignore
const eth = new Eth(
Expand Down Expand Up @@ -67,7 +71,9 @@ export class QuickNodeBlockProducer extends BlockProducer {
endpoints,
maxRetries,
"quicknode_block_getter",
blockSubscriptionTimeout
blockSubscriptionTimeout,
blockDelay,
alternateEndpoint
),
new BlockGetter(eth, maxRetries),
database,
Expand All @@ -79,6 +85,6 @@ export class QuickNodeBlockProducer extends BlockProducer {
startBlock,
maxReOrgDepth
);

}
}
4 changes: 2 additions & 2 deletions tests/block_producer/block_producer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ describe("Block Producer", () => {
return;
});

mockedQueueObject.front.mockReturnValueOnce({
mockedQueueObject.shift.mockReturnValueOnce({
number: 15400000,
hash: "0x19823dbf42b70e95e552b48e3df646d2f41b510e20b8ee1878acb18eccbefb07"
});
mockedQueueObject.front.mockReturnValueOnce({
mockedQueueObject.shift.mockReturnValueOnce({
number: 15400001,
hash: "0x19823dbf42b70e95e552b48e3df646d2f41b510e20b8ee1878acb18eccbefb07"
});
Expand Down
4 changes: 3 additions & 1 deletion tests/block_producer/quicknode_block_producer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ describe("Block Producer", () => {
["rpc.com", "rpc2.com"],
0,
"quicknode_block_getter",
60000
60000,
0,
undefined
);
});

Expand Down

0 comments on commit 10481d0

Please sign in to comment.