Skip to content

Commit

Permalink
Merge pull request delta-hq#245 from kyle-layerzero/fix/stargate-stream
Browse files Browse the repository at this point in the history
stargate : bridge : use readable stream to avoid stack overflow
  • Loading branch information
0xroll authored Jul 3, 2024
2 parents 97f4248 + 99464b8 commit b90975a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 109 deletions.
2 changes: 2 additions & 0 deletions adapters/stargate/hourly_blocks.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
number,timestamp
5154879,1717513197
127 changes: 39 additions & 88 deletions adapters/stargate/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,109 +1,60 @@
import fs from "fs";
import { write } from "fast-csv";
import csv from "csv-parser";
import path from "path";

import { BlockData, OutputSchemaRow } from "./sdk/types";
import { getTimestampAtBlock, getUserBalancesAtBlock } from "./sdk/lib";

const getData = async () => {
const blocks = [3676829];
const csvRows: OutputSchemaRow[] = [];

for (const block of blocks) {
const timestamp = await getTimestampAtBlock(block);

const userBalances = await getUserTVLByBlock({
blockNumber: block,
blockTimestamp: timestamp,
});

csvRows.push(...userBalances);
}

const ws = fs.createWriteStream("outputData.csv");
write(csvRows, { headers: true })
.pipe(ws)
.on("finish", () => {
console.log("CSV file has been written.");
});
};

const WHITELISTED_TOKEN_ADDRESS = '0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee'

export const getUserTVLByBlock = async ({
blockNumber,
blockTimestamp,
}: BlockData): Promise<OutputSchemaRow[]> => {
const positions = await getUserBalancesAtBlock(blockNumber);

return positions.map((position) => ({
block_number: blockNumber,
timestamp: blockTimestamp,
user_address: position.user,
token_address: WHITELISTED_TOKEN_ADDRESS,
token_balance: BigInt(position.balance),
token_symbol: "",
usd_price: 0,
}));
};

// getData().then(() => {
// console.log("Done");
// });
import { BlockData } from "./sdk/types";
import { PositionsStream } from "./sdk/lib";

const readBlocksFromCSV = async (filePath: string): Promise<BlockData[]> => {
const blocks: BlockData[] = [];
//console.log(`Reading: ${filePath}`);

await new Promise<void>((resolve, reject) => {
fs.createReadStream(filePath)
.pipe(csv({ separator: "," })) // Specify the separator as '\t' for TSV files
.on("data", (row) => {
//console.log(row);
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
//console.log(`Maybe Data ${blockNumber} ${blockTimestamp}`);
if (!isNaN(blockNumber) && blockTimestamp) {
//console.log(`Valid Data`);
blocks.push({ blockNumber: blockNumber, blockTimestamp });
}
})
.on("end", () => {
resolve();
})
.on("error", (err) => {
reject(err);
});
fs.createReadStream(filePath)
.pipe(csv({ separator: "," })) // Specify the separator as '\t' for TSV files
.on("data", (row) => {
//console.log(row);
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
//console.log(`Maybe Data ${blockNumber} ${blockTimestamp}`);
if (!isNaN(blockNumber) && blockTimestamp) {
//console.log(`Valid Data`);
blocks.push({ blockNumber: blockNumber, blockTimestamp });
}
})
.on("end", () => {
resolve();
})
.on("error", (err) => {
reject(err);
});
});

//console.log(`blocks: ${blocks.length}`);
return blocks;
};

readBlocksFromCSV(path.resolve(__dirname, "../hourly_blocks.csv"))
.then(async (blocks) => {
console.log(blocks);
const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks
const csvWriteStream = fs.createWriteStream(`outputData.csv`, {
flags: "w",
});

csvWriteStream.write(
"block_number,timestamp,user_address,token_address,token_balance,token_symbol,usd_price\n"
);

for (const block of blocks) {
try {
const result = await getUserTVLByBlock(block);
for (const block of blocks) {
try {
const poisitionsStream = new PositionsStream(block);

// Accumulate CSV rows for all blocks
allCsvRows.push(...result);
} catch (error) {
console.error(`An error occurred for block ${block}:`, error);
}
poisitionsStream.pipe(csvWriteStream);
} catch (error) {
console.error(
`An error occurred for block ${block.blockNumber}:`,
error
);
}
const ws = fs.createWriteStream(`outputData.csv`, {
flags: "w",
});
write(allCsvRows, {headers: true})
.pipe(ws).on("finish", () => {
console.log(`CSV file has been written.`);
});
}
})
.catch((err) => {
console.error("Error reading CSV file:", err);
});
console.error("Error reading CSV file:", err);
});
58 changes: 37 additions & 21 deletions adapters/stargate/src/sdk/lib.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import { Readable } from "stream";
import { SUBGRAPH_URL, client } from "./config";
import { Position } from "./types";

export const getUserBalancesAtBlock = async (blockNumber: number) => {
const result: Position[] = [];
const WHITELISTED_TOKEN_ADDRESS = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee";

let skip = 0;
let fetchNext = true;
export const getTimestampAtBlock = async (blockNumber: number) => {
const block = await client.getBlock({
blockNumber: BigInt(blockNumber),
});
return Number(block.timestamp * 1000n);
};

export class PositionsStream extends Readable {
skip: string;

while (fetchNext) {
constructor(private block: { blockNumber: number; blockTimestamp: number }) {
super({ objectMode: true });
this.skip = "0";
}

async _read() {
const query = `
query {
farmPositions(
first: 1000,
where: { id_gt: ${JSON.stringify(skip)}, balance_gt: 0 },
block: { number: ${blockNumber} },
where: { id_gt: ${JSON.stringify(this.skip)}, balance_gt: 0 },
block: { number: ${this.block.blockNumber} },
orderBy: id
) {
id
Expand All @@ -32,21 +44,25 @@ export const getUserBalancesAtBlock = async (blockNumber: number) => {

const { data } = await response.json();
const { farmPositions } = data;
const { blockNumber, blockTimestamp } = this.block;

const rows = farmPositions.map((position: Position) =>
[
blockNumber,
blockTimestamp,
position.user,
WHITELISTED_TOKEN_ADDRESS,
BigInt(position.balance),
"",
0,
].join(",")
);

result.push(...farmPositions);
if (farmPositions.length < 1000) {
fetchNext = false;
if (rows.length) {
this.push(rows.join("\n"));
this.skip = farmPositions.at(-1).id;
} else {
skip = farmPositions.at(-1).id;
this.push(null);
}
}

return result;
};

export const getTimestampAtBlock = async (blockNumber: number) => {
const block = await client.getBlock({
blockNumber: BigInt(blockNumber),
});
return Number(block.timestamp * 1000n);
};
}

0 comments on commit b90975a

Please sign in to comment.