From 99464b80e7f800ffb3f03f23fc11b82b8f68bf78 Mon Sep 17 00:00:00 2001 From: kyle-layerzero <98773402+kyle-layerzero@users.noreply.github.com> Date: Mon, 1 Jul 2024 16:32:57 -0700 Subject: [PATCH] fix: use readable stream to avoid stack overflow --- adapters/stargate/hourly_blocks.csv | 2 + adapters/stargate/src/index.ts | 127 +++++++++------------------- adapters/stargate/src/sdk/lib.ts | 58 ++++++++----- 3 files changed, 78 insertions(+), 109 deletions(-) create mode 100644 adapters/stargate/hourly_blocks.csv diff --git a/adapters/stargate/hourly_blocks.csv b/adapters/stargate/hourly_blocks.csv new file mode 100644 index 00000000..cc795db9 --- /dev/null +++ b/adapters/stargate/hourly_blocks.csv @@ -0,0 +1,2 @@ +number,timestamp +5154879,1717513197 \ No newline at end of file diff --git a/adapters/stargate/src/index.ts b/adapters/stargate/src/index.ts index a64e01a3..bd71a755 100644 --- a/adapters/stargate/src/index.ts +++ b/adapters/stargate/src/index.ts @@ -1,109 +1,60 @@ import fs from "fs"; -import { write } from "fast-csv"; import csv from "csv-parser"; import path from "path"; -import { BlockData, OutputSchemaRow } from "./sdk/types"; -import { getTimestampAtBlock, getUserBalancesAtBlock } from "./sdk/lib"; - -const getData = async () => { - const blocks = [3676829]; - const csvRows: OutputSchemaRow[] = []; - - for (const block of blocks) { - const timestamp = await getTimestampAtBlock(block); - - const userBalances = await getUserTVLByBlock({ - blockNumber: block, - blockTimestamp: timestamp, - }); - - csvRows.push(...userBalances); - } - - const ws = fs.createWriteStream("outputData.csv"); - write(csvRows, { headers: true }) - .pipe(ws) - .on("finish", () => { - console.log("CSV file has been written."); - }); -}; - -const WHITELISTED_TOKEN_ADDRESS = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee' - -export const getUserTVLByBlock = async ({ - blockNumber, - blockTimestamp, -}: BlockData): Promise => { - const positions = await getUserBalancesAtBlock(blockNumber); - - return positions.map((position) => ({ - block_number: blockNumber, - timestamp: blockTimestamp, - user_address: position.user, - token_address: WHITELISTED_TOKEN_ADDRESS, - token_balance: BigInt(position.balance), - token_symbol: "", - usd_price: 0, - })); -}; - -// getData().then(() => { -// console.log("Done"); -// }); +import { BlockData } from "./sdk/types"; +import { PositionsStream } from "./sdk/lib"; const readBlocksFromCSV = async (filePath: string): Promise => { const blocks: BlockData[] = []; - //console.log(`Reading: ${filePath}`); await new Promise((resolve, reject) => { - fs.createReadStream(filePath) - .pipe(csv({ separator: "," })) // Specify the separator as '\t' for TSV files - .on("data", (row) => { - //console.log(row); - const blockNumber = parseInt(row.number, 10); - const blockTimestamp = parseInt(row.timestamp, 10); - //console.log(`Maybe Data ${blockNumber} ${blockTimestamp}`); - if (!isNaN(blockNumber) && blockTimestamp) { - //console.log(`Valid Data`); - blocks.push({ blockNumber: blockNumber, blockTimestamp }); - } - }) - .on("end", () => { - resolve(); - }) - .on("error", (err) => { - reject(err); - }); + fs.createReadStream(filePath) + .pipe(csv({ separator: "," })) // Specify the separator as '\t' for TSV files + .on("data", (row) => { + //console.log(row); + const blockNumber = parseInt(row.number, 10); + const blockTimestamp = parseInt(row.timestamp, 10); + //console.log(`Maybe Data ${blockNumber} ${blockTimestamp}`); + if (!isNaN(blockNumber) && blockTimestamp) { + //console.log(`Valid Data`); + blocks.push({ blockNumber: blockNumber, blockTimestamp }); + } + }) + .on("end", () => { + resolve(); + }) + .on("error", (err) => { + reject(err); + }); }); - //console.log(`blocks: ${blocks.length}`); return blocks; }; readBlocksFromCSV(path.resolve(__dirname, "../hourly_blocks.csv")) .then(async (blocks) => { - console.log(blocks); - const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks + const csvWriteStream = fs.createWriteStream(`outputData.csv`, { + flags: "w", + }); + + csvWriteStream.write( + "block_number,timestamp,user_address,token_address,token_balance,token_symbol,usd_price\n" + ); - for (const block of blocks) { - try { - const result = await getUserTVLByBlock(block); + for (const block of blocks) { + try { + const poisitionsStream = new PositionsStream(block); - // Accumulate CSV rows for all blocks - allCsvRows.push(...result); - } catch (error) { - console.error(`An error occurred for block ${block}:`, error); - } + poisitionsStream.pipe(csvWriteStream); + } catch (error) { + console.error( + `An error occurred for block ${block.blockNumber}:`, + error + ); } - const ws = fs.createWriteStream(`outputData.csv`, { - flags: "w", - }); - write(allCsvRows, {headers: true}) - .pipe(ws).on("finish", () => { - console.log(`CSV file has been written.`); - }); + } }) .catch((err) => { - console.error("Error reading CSV file:", err); -}); \ No newline at end of file + console.error("Error reading CSV file:", err); + }); diff --git a/adapters/stargate/src/sdk/lib.ts b/adapters/stargate/src/sdk/lib.ts index 204e074a..e00eafad 100644 --- a/adapters/stargate/src/sdk/lib.ts +++ b/adapters/stargate/src/sdk/lib.ts @@ -1,19 +1,31 @@ +import { Readable } from "stream"; import { SUBGRAPH_URL, client } from "./config"; import { Position } from "./types"; -export const getUserBalancesAtBlock = async (blockNumber: number) => { - const result: Position[] = []; +const WHITELISTED_TOKEN_ADDRESS = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"; - let skip = 0; - let fetchNext = true; +export const getTimestampAtBlock = async (blockNumber: number) => { + const block = await client.getBlock({ + blockNumber: BigInt(blockNumber), + }); + return Number(block.timestamp * 1000n); +}; + +export class PositionsStream extends Readable { + skip: string; - while (fetchNext) { + constructor(private block: { blockNumber: number; blockTimestamp: number }) { + super({ objectMode: true }); + this.skip = "0"; + } + + async _read() { const query = ` query { farmPositions( first: 1000, - where: { id_gt: ${JSON.stringify(skip)}, balance_gt: 0 }, - block: { number: ${blockNumber} }, + where: { id_gt: ${JSON.stringify(this.skip)}, balance_gt: 0 }, + block: { number: ${this.block.blockNumber} }, orderBy: id ) { id @@ -32,21 +44,25 @@ export const getUserBalancesAtBlock = async (blockNumber: number) => { const { data } = await response.json(); const { farmPositions } = data; + const { blockNumber, blockTimestamp } = this.block; + + const rows = farmPositions.map((position: Position) => + [ + blockNumber, + blockTimestamp, + position.user, + WHITELISTED_TOKEN_ADDRESS, + BigInt(position.balance), + "", + 0, + ].join(",") + ); - result.push(...farmPositions); - if (farmPositions.length < 1000) { - fetchNext = false; + if (rows.length) { + this.push(rows.join("\n")); + this.skip = farmPositions.at(-1).id; } else { - skip = farmPositions.at(-1).id; + this.push(null); } } - - return result; -}; - -export const getTimestampAtBlock = async (blockNumber: number) => { - const block = await client.getBlock({ - blockNumber: BigInt(blockNumber), - }); - return Number(block.timestamp * 1000n); -}; +}