Skip to content

Commit

Permalink
Stop saving and emmiting all the event logs of fetched txs
Browse files Browse the repository at this point in the history
  • Loading branch information
Ktl-XV committed Oct 13, 2023
1 parent 6643df4 commit 8531a81
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,5 @@ export const DEFAULT_UNISWAP_V3_FACTORY_ADDRESS = '0x1f98431c8ad98523631ae4a59f2
export const WRAP_NATIVE_EVENT_TOPIC = ['0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c'];
export const UNWRAP_NATIVE_EVENT_TOPIC = ['0x7fcf532c15f0a6db0bd6d0e038bea71d30d808c7d98cb3bf7268a95bf5081b65'];
export const TRANSFER_EVENT_TOPIC_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef';

export const ZEROEX_API_AFFILIATE_SELECTOR = '869584cd';
5 changes: 3 additions & 2 deletions src/parsers/web3/parse_web3_objects.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BigNumber } from '@0x/utils';
import { Block, Transaction, TransactionLogs, TransactionReceipt } from '../../entities';
import { BlockWithoutTransactionData, TransactionReceipt as RawReceipt, Transaction as RawTx } from 'ethereum-types';
import { ZEROEX_API_AFFILIATE_SELECTOR } from '../../constants';

export interface RawTx1559 extends RawTx {
type: number;
Expand Down Expand Up @@ -33,8 +34,8 @@ export function parseTransaction(rawTx: RawTx1559): Transaction {
transaction.maxPriorityFeePerGas =
rawTx.maxPriorityFeePerGas === undefined ? null : new BigNumber(rawTx.maxPriorityFeePerGas);

if (transaction.input.includes('869584cd')) {
const bytesPos = rawTx.input.indexOf('869584cd');
if (transaction.input.includes(ZEROEX_API_AFFILIATE_SELECTOR)) {
const bytesPos = rawTx.input.indexOf(ZEROEX_API_AFFILIATE_SELECTOR);
transaction.affiliateAddress = '0x'.concat(rawTx.input.slice(bytesPos + 32, bytesPos + 72));
const quoteId = rawTx.input.slice(bytesPos + 104, bytesPos + 136);
if (quoteId.slice(0, 14) === '00000000000000') {
Expand Down
10 changes: 5 additions & 5 deletions src/scripts/pull_and_save_backfill_tx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class BackfillTxScraper {
const txHashList = txData.parsedTxs.map((tx) => `'${tx.transactionHash}'`).toString();
const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`;
const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`;
const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
// const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
const txBacklogQuery = `DELETE FROM ${SCHEMA}.tx_backfill WHERE transaction_hash IN (${txHashList});`;

const queryRunner = connection.createQueryRunner();
Expand All @@ -46,7 +46,7 @@ export class BackfillTxScraper {
// delete existing tx data, for safety
await queryRunner.manager.query(txDeleteQuery);
await queryRunner.manager.query(txReceiptDeleteQuery);
await queryRunner.manager.query(txLogsDeleteQuery);
// await queryRunner.manager.query(txLogsDeleteQuery);
await queryRunner.manager.query(txBacklogQuery);

for (const chunkItems of chunk(txData.parsedTxs, 300)) {
Expand All @@ -55,9 +55,9 @@ export class BackfillTxScraper {
for (const chunkItems of chunk(txData.parsedReceipts, 300)) {
await queryRunner.manager.insert(TransactionReceipt, chunkItems);
}
for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
await queryRunner.manager.insert(TransactionLogs, chunkItems);
}
// for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
// await queryRunner.manager.insert(TransactionLogs, chunkItems);
// }

// commit transaction now:
await queryRunner.commitTransaction();
Expand Down
24 changes: 15 additions & 9 deletions src/scripts/utils/web3_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ export type TokenMetadataMap = {
export type TxDetailsType = {
parsedTxs: Transaction[];
parsedReceipts: TransactionReceipt[];
parsedTxLogs: TransactionLogs[];
// parsedTxLogs: TransactionLogs[];
};

export class TxDetails implements TxDetailsType {
parsedTxs = [];
parsedReceipts = [];
parsedTxLogs = [];
// parsedTxLogs = [];
}

export const MISSING_TRANSACTIONS = new Gauge({
Expand Down Expand Up @@ -626,7 +626,7 @@ export async function getParseTxsAsync(

const parsedTxs = foundTxs.map((rawTxn) => parseTransaction(rawTxn));
const parsedReceipts = foundTxReceipts.map((rawTxReceipt) => parseTransactionReceipt(rawTxReceipt));
const parsedTxLogs = foundTxReceipts.map((rawTxReceipt) => parseTransactionLogs(rawTxReceipt));
//const parsedTxLogs = foundTxReceipts.map((rawTxReceipt) => parseTransactionLogs(rawTxReceipt));

const foundHashes = foundTxReceipts.map((rawTxReceipt) => rawTxReceipt.transactionHash);
const missingHashes = dedupedHashes.filter((hash) => !foundHashes.includes(hash));
Expand All @@ -638,7 +638,11 @@ export async function getParseTxsAsync(

logger.debug(`got ${parsedReceipts.length} txs`);

return { parsedTxs, parsedReceipts, parsedTxLogs };
return {
parsedTxs,
parsedReceipts,
//parsedTxLogs
};
}

export async function getParseSaveTxAsync(
Expand All @@ -654,7 +658,7 @@ export async function getParseSaveTxAsync(
const txHashList = txData.parsedTxs.map((tx) => `'${tx.transactionHash}'`).toString();
const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`;
const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`;
const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
// const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
if (txData.parsedTxs.length) {
// delete the transactions for the fetched events
const queryRunner = connection.createQueryRunner();
Expand All @@ -663,17 +667,17 @@ export async function getParseSaveTxAsync(

await queryRunner.manager.query(txDeleteQuery);
await queryRunner.manager.query(txReceiptDeleteQuery);
await queryRunner.manager.query(txLogsDeleteQuery);
//await queryRunner.manager.query(txLogsDeleteQuery);

for (const chunkItems of chunk(txData.parsedTxs, 300)) {
await queryRunner.manager.insert(Transaction, chunkItems);
}
for (const chunkItems of chunk(txData.parsedReceipts, 300)) {
await queryRunner.manager.insert(TransactionReceipt, chunkItems);
}
for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
await queryRunner.manager.insert(TransactionLogs, chunkItems);
}
// for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
// await queryRunner.manager.insert(TransactionLogs, chunkItems);
// }

await queryRunner.commitTransaction();
queryRunner.release();
Expand All @@ -690,12 +694,14 @@ export async function getParseSaveTxAsync(
['transactionHash'],
txData.parsedReceipts,
);
/*
await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.logs.v0`,
['transactionHash'],
txData.parsedTxLogs,
);
*/
}
logger.info(`Saved ${txData.parsedTxs.length} Transactions`);
}

0 comments on commit 8531a81

Please sign in to comment.