Skip to content

Commit

Permalink
Allow partial fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
Ktl-XV committed Jan 17, 2024
1 parent 710e03a commit f78b630
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 55 deletions.
7 changes: 3 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
KAFKA_AUTH_USER,
KAFKA_BROKERS,
KAFKA_SSL,
SCRAPER_MODE,
SECONDS_BETWEEN_RUNS,
} from './config';
import * as ormConfig from './ormconfig';
Expand Down Expand Up @@ -68,8 +69,6 @@ if (ENABLE_PROMETHEUS_METRICS) {
startMetricsServer();
}

const SCRAPER_MODE = 'Blocks';

chainIdChecker.checkChainId(CHAIN_ID);

// run pull and save events
Expand All @@ -86,10 +85,10 @@ createConnection(ormConfig as ConnectionOptions)
if (FEAT_UNISWAP_V3_POOL_CREATED_EVENT) {
await UniV3PoolSingleton.initInstance(connection);
}
if (SCRAPER_MODE === 'Blocks') {
if (SCRAPER_MODE === 'BLOCKS') {
schedule(connection, producer, blockEventsScraper.getParseSaveAsync, 'Pull and Save Blocks and Events');
schedule(connection, producer, blockEventsScraper.backfillAsync, 'Backfill Blocks and Events');
} else if (SCRAPER_MODE === 'Logs') {
} else if (SCRAPER_MODE === 'EVENTS') {
schedule(null, null, currentBlockMonitor.monitor, 'Current Block');
schedule(connection, producer, blockScraper.getParseSaveEventsAsync, 'Pull and Save Blocks');
schedule(
Expand Down
144 changes: 93 additions & 51 deletions src/scripts/pull_and_save_block_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,52 +237,86 @@ async function getParseSaveBlocksTransactionsEvents(
connection: Connection,
producer: Producer | null,
newBlocks: EVMBlock[],
) {
allowPartialSuccess: boolean,
): Promise<boolean> {
const blockNumbers = newBlocks.map((newBlock) => newBlock.number!);

const blockRanges = findRanges(blockNumbers);

logger.info(`Pulling Block Events for blocks: ${JSON.stringify(blockRanges)}`);

const newBlockReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers);
const newBlocksReceipts = await web3Source.getBatchBlockReceiptsAsync(blockNumbers);

const fullBlocks: FullBlock[] = newBlocks.map((newBlock, blockIndex): FullBlock => {
const transactionsWithLogs = newBlock.transactions.map(
(tx: EVMTransaction, txIndex: number): FullTransaction => {
if (newBlock.hash !== newBlockReceipts[blockIndex][txIndex].blockHash) {
throw Error('Wrong Block hash');
const filteredNewBlocksReceipts = newBlocksReceipts.filter((blockReciepts) => blockReciepts !== null);

if (newBlocksReceipts.length !== filteredNewBlocksReceipts.length) {
if (!allowPartialSuccess) {
return false;
}
const { nullOnlyAtEnd } = newBlocksReceipts.reduce(
(state, blockReciepts) => {
if (state.hasSeenNull && blockReciepts !== null) {
state.nullOnlyAtEnd = false;
}
return {
...tx,
...newBlockReceipts[blockIndex][txIndex],
type: tx.type,
};

if (newBlocksReceipts === null) {
state.hasSeenNull = true;
}
return state;
},
{ hasSeenNull: false, nullOnlyAtEnd: true },
);
return { ...newBlocks[blockIndex], transactions: transactionsWithLogs };
});

const parsedFullBlocks = fullBlocks.map(parseBlockTransactionsEvents);

const eventTables = eventScrperProps
.filter((props) => props.enabled)
.map((props: EventScraperProps) => props.table);

await saveFullBlocks(connection, eventTables, parsedFullBlocks);

if (FEAT_TOKENS_FROM_TRANSFERS) {
const tokensFromTransfers = [
...new Set(
newBlockReceipts
.flat()
.map((tx) => tx.logs)
.flat()
.filter((log) => log.topics.length > 0 && log.topics[0] === TRANSFER_EVENT_TOPIC_0)
.map((log) => log.address),
),
];
await getParseSaveTokensAsync(connection, producer, web3Source, tokensFromTransfers);
if (nullOnlyAtEnd) {
logger.info('Last block(s) reciepts not found, retrying that block on the next run');
} else {
logger.error("Missing intermideate block reciepts, can't continue. Retrying next run");
logger.error(newBlocksReceipts);
return false;
}
}

if (filteredNewBlocksReceipts.length > 0) {
const fullBlocks: FullBlock[] = filteredNewBlocksReceipts.map((newBlockReceipts, blockIndex): FullBlock => {
const transactionsWithLogs = newBlockReceipts.map(
(txReceipt: EVMTransactionReceipt, txIndex: number): FullTransaction => {
if (txReceipt.blockHash !== newBlocks[blockIndex].hash) {
throw Error('Wrong Block hash');
}
return {
...newBlocks[blockIndex].transactions[txIndex],
...txReceipt,
type: newBlocks[blockIndex].transactions[txIndex].type,
};
},
);
return { ...newBlocks[blockIndex], transactions: transactionsWithLogs };
});

const parsedFullBlocks = fullBlocks.map(parseBlockTransactionsEvents);

const eventTables = eventScrperProps
.filter((props) => props.enabled)
.map((props: EventScraperProps) => props.table);

await saveFullBlocks(connection, eventTables, parsedFullBlocks);

if (FEAT_TOKENS_FROM_TRANSFERS) {
const tokensFromTransfers = [
...new Set(
filteredNewBlocksReceipts
.flat()
.map((tx) => tx.logs)
.flat()
.filter((log) => log.topics.length > 0 && log.topics[0] === TRANSFER_EVENT_TOPIC_0)
.map((log) => log.address),
),
];
await getParseSaveTokensAsync(connection, producer, web3Source, tokensFromTransfers);
}
return true;
}
return false;
}

export class BlockEventsScraper {
Expand All @@ -304,19 +338,25 @@ export class BlockEventsScraper {
);

const newBlocks = await web3Source.getBatchBlockInfoAsync(blockNumbers, true);
getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks);

const queryRunner = connection.createQueryRunner();
await queryRunner.connect();
await queryRunner.manager.query(
`DELETE FROM ${SCHEMA}.backfill_blocks
WHERE block_number IN (${blockNumbers.join(',')})`,
const success = await getParseSaveBlocksTransactionsEvents(
connection,
producer,
newBlocks,
allowPartialSuccess,
);
queryRunner.release();
if (success) {
const queryRunner = connection.createQueryRunner();
await queryRunner.connect();
await queryRunner.manager.query(
`DELETE FROM ${SCHEMA}.backfill_blocks
WHERE block_number IN (${blockNumbers.join(',')})`,
);
queryRunner.release();

const endTime = new Date().getTime();
const scriptDurationSeconds = (endTime - startTime) / 1000;
SCRIPT_RUN_DURATION.set({ script: 'events-by-block-backfill' }, scriptDurationSeconds);
const endTime = new Date().getTime();
const scriptDurationSeconds = (endTime - startTime) / 1000;
SCRIPT_RUN_DURATION.set({ script: 'events-by-block-backfill' }, scriptDurationSeconds);
}
}
}
public async getParseSaveAsync(connection: Connection, producer: Producer | null): Promise<void> {
Expand All @@ -339,7 +379,7 @@ export class BlockEventsScraper {
const firstStartBlock = Math.max(...eventScrperProps.map((props) => props.startBlock));
logger.warn(`Going to start from block: ${firstStartBlock}`);
const newBlocks = await web3Source.getBatchBlockInfoForRangeAsync(firstStartBlock, firstStartBlock, true);
getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks);
await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, true);
return;
}

Expand Down Expand Up @@ -383,12 +423,14 @@ export class BlockEventsScraper {
throw Error(`Big reorg detected, of more than ${lookback}, manual intervention needed`);
}

getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks);
const success = await getParseSaveBlocksTransactionsEvents(connection, producer, newBlocks, true);

const endTime = new Date().getTime();
const scriptDurationSeconds = (endTime - startTime) / 1000;
SCRIPT_RUN_DURATION.set({ script: 'events-by-block' }, scriptDurationSeconds);
if (success) {
const endTime = new Date().getTime();
const scriptDurationSeconds = (endTime - startTime) / 1000;
SCRIPT_RUN_DURATION.set({ script: 'events-by-block' }, scriptDurationSeconds);

logger.info(`Finished pulling events block by in ${scriptDurationSeconds}`);
logger.info(`Finished pulling events block by in ${scriptDurationSeconds}`);
}
}
}

0 comments on commit f78b630

Please sign in to comment.