diff --git a/docker-compose.yml b/docker-compose.yml index 6449fc21..213332bb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,16 +30,17 @@ services: SCHEMA: 'events' # FEAT_EXCLUSIVE_TOKENS_FROM_TRANSACTIONS: "true" # TOKENS_FROM_TRANSACTIONS_START_BLOCK: 9193266 - KAFKA_BROKERS: '${KAFKA_BROKERS}' - KAFKA_SSL: '${KAFKA_SSL}' - KAFKA_AUTH_USER: '${KAFKA_AUTH_USER}' - KAFKA_AUTH_PASSWORD: '${KAFKA_AUTH_PASSWORD}' + #KAFKA_BROKERS: '${KAFKA_BROKERS}' + #KAFKA_SSL: '${KAFKA_SSL}' + #KAFKA_AUTH_USER: '${KAFKA_AUTH_USER}' + #KAFKA_AUTH_PASSWORD: '${KAFKA_AUTH_PASSWORD}' EP_DEPLOYMENT_BLOCK: 10247094 MAX_BLOCKS_TO_SEARCH: 1000 MAX_BLOCKS_TO_PULL: 1000 MAX_TX_TO_PULL: 1000 BLOCK_FINALITY_THRESHOLD: 0 SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true" UNISWAP_V2_VIP_SWAP_SOURCES: "UniswapV2,SushiSwap" UNISWAP_V2_VIP_SWAP_START_BLOCK: 10917104 @@ -96,6 +97,7 @@ services: MAX_BLOCKS_TO_SEARCH: 2000 MAX_BLOCKS_TO_PULL: 5000 SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true" UNISWAP_V2_VIP_SWAP_SOURCES: "PancakeSwap,BakerySwap,SushiSwap,CafeSwap,SwapLiquidity,ApeSwapFinance,CheeseSwap,Swap" FEAT_UNISWAP_V2_PAIR_CREATED_EVENT: "true" @@ -125,7 +127,8 @@ services: EP_DEPLOYMENT_BLOCK: 14391480 MAX_BLOCKS_TO_SEARCH: 1000 MAX_BLOCKS_TO_PULL: 1000 - MINUTES_BETWEEN_RUNS: 1 + SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 FEAT_SLINGSHOT_TRADE_EVENT: "true" SLINGSHOT_DEPLOYMENT_BLOCK: 14500000 FEAT_LIMIT_ORDERS: "true" @@ -159,7 +162,10 @@ services: EP_DEPLOYMENT_BLOCK: 3601700 MAX_BLOCKS_TO_SEARCH: 5000 MAX_BLOCKS_TO_PULL: 2000 - MINUTES_BETWEEN_RUNS: 1 + SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 + SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true" FLASHWALLET_ADDRESS: "0xdb6f1920a889355780af7570773609bd8cb1f498" FLASHWALLET_DEPLOYMENT_BLOCK: 11805869 @@ -190,7 +196,8 @@ services: EP_DEPLOYMENT_BLOCK: 18855765 MAX_BLOCKS_TO_SEARCH: 2000 MAX_BLOCKS_TO_PULL: 1000 - SECONDS_BETWEEN_RUNS: 60 + SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true" FLASHWALLET_ADDRESS: "0xb4d961671cadfed687e040b076eee29840c142e5" FLASHWALLET_DEPLOYMENT_BLOCK: 18855797 @@ -219,7 +226,8 @@ services: EP_DEPLOYMENT_BLOCK: 9350111 MAX_BLOCKS_TO_SEARCH: 5000 MAX_BLOCKS_TO_PULL: 2000 - MINUTES_BETWEEN_RUNS: 1 + SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 FEAT_NFT: "true" NFT_FEATURE_START_BLOCK: 11820000 @@ -243,6 +251,7 @@ services: MAX_BLOCKS_TO_SEARCH: 1000 MAX_BLOCKS_TO_PULL: 1000 SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 EP_ADDRESS: "0xdef1abe32c034e558cdd535791643c58a13acc10" FEAT_TRANSFORMED_ERC20_EVENT: "true" FEAT_NFT: "true" @@ -273,6 +282,7 @@ services: MAX_BLOCKS_TO_SEARCH: 1000 MAX_BLOCKS_TO_PULL: 1000 SECONDS_BETWEEN_RUNS: 30 + RESCRAPE_BLOCKS: 10 EP_ADDRESS: "0xdef1c0ded9bec7f1a1670819833240f027b25eff" FEAT_NFT: "true" NFT_FEATURE_START_BLOCK: 4050733 @@ -295,6 +305,7 @@ services: MAX_BLOCKS_TO_SEARCH: 1000 MAX_BLOCKS_TO_PULL: 100 SECONDS_BETWEEN_RUNS: 1 + RESCRAPE_BLOCKS: 10 FEAT_NFT: "true" NFT_FEATURE_START_BLOCK: 1410394 KAFKA_BROKERS: '${KAFKA_BROKERS}' diff --git a/src/config.ts b/src/config.ts index 9538fd25..d9620a7e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -8,6 +8,7 @@ import { DEFAULT_FEAT_LIMIT_ORDERS, DEFAULT_FEAT_META_TRANSACTION_EXECUTED_EVENT, DEFAULT_FEAT_NFT, + DEFAULT_FEAT_ONCHAIN_GOVERNANCE, DEFAULT_FEAT_OTC_ORDERS, DEFAULT_FEAT_PLP_SWAP_EVENT, DEFAULT_FEAT_POLYGON_RFQM_PAYMENTS, @@ -19,26 +20,28 @@ import { DEFAULT_FEAT_UNISWAP_V2_PAIR_CREATED_EVENT, DEFAULT_FEAT_UNISWAP_V2_SYNC_EVENT, DEFAULT_FEAT_UNISWAP_V2_VIP_SWAP_EVENT, - DEFAULT_FEAT_UNISWAP_V3_VIP_SWAP_EVENT, DEFAULT_FEAT_UNISWAP_V3_SWAP_EVENT, + DEFAULT_FEAT_UNISWAP_V3_VIP_SWAP_EVENT, DEFAULT_FEAT_V3_FILL_EVENT, DEFAULT_FEAT_V3_NATIVE_FILL, + DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_EVENT, + DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_TRANSFER_EVENT, DEFAULT_LOCAL_POSTGRES_URI, + DEFAULT_MAX_BLOCKS_REORG, DEFAULT_MAX_BLOCKS_TO_PULL, DEFAULT_MAX_BLOCKS_TO_SEARCH, - DEFAULT_MAX_BLOCKS_REORG, DEFAULT_MAX_TIME_TO_SEARCH, DEFAULT_MAX_TX_TO_PULL, DEFAULT_METRICS_PATH, DEFAULT_MINUTES_BETWEEN_RUNS, DEFAULT_PROMETHEUS_PORT, + DEFAULT_RESCRAPE_BLOCKS, DEFAULT_STAKING_POOLS_JSON_URL, DEFAULT_STAKING_POOLS_METADATA_JSON_URL, - DEFAULT_FEAT_ONCHAIN_GOVERNANCE, - DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_EVENT, - DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_TRANSFER_EVENT, } from './constants'; +import { logger } from './utils'; + const throwError = (err: string) => { throw new Error(err); }; @@ -407,7 +410,7 @@ validateAddress( export const KAFKA_BROKERS = process.env.KAFKA_BROKERS ? process.env.KAFKA_BROKERS.split(',') : []; if (KAFKA_BROKERS.length === 0) { - throwError(`KAFKA_BROKERS is missing`); + logger.warn('KAFKA_BROKERS is empty, disabling kafka'); } export const KAFKA_AUTH_USER = process.env.KAFKA_AUTH_USER!; export const KAFKA_AUTH_PASSWORD = process.env.KAFKA_AUTH_PASSWORD!; @@ -431,6 +434,8 @@ validateStartBlock( FEAT_SOCKET_BRIDGE_EVENT, ); +export const RESCRAPE_BLOCKS = getIntConfig('RESCRAPE_BLOCKS', DEFAULT_RESCRAPE_BLOCKS); + function getBoolConfig(env: string, defaultValue: boolean): boolean { if (Object.prototype.hasOwnProperty.call(process.env, env)) { return process.env[env] === 'true'; diff --git a/src/constants.ts b/src/constants.ts index 39f5f1e6..d7e6d9c4 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -6,6 +6,7 @@ export const DEFAULT_MAX_BLOCKS_TO_PULL = 120; export const DEFAULT_MAX_BLOCKS_TO_SEARCH = 120; export const DEFAULT_MAX_TX_TO_PULL = 1000; export const DEFAULT_BLOCK_FINALITY_THRESHOLD = 10; +export const DEFAULT_RESCRAPE_BLOCKS = 0; export const DEFAULT_MINUTES_BETWEEN_RUNS = 3; export const DEFAULT_STAKING_POOLS_JSON_URL = 'https://raw.githubusercontent.com/0xProject/0x-staking-pool-registry/master/staking_pools.json'; diff --git a/src/index.ts b/src/index.ts index e575142d..7578d25c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -32,20 +32,24 @@ import { startMetricsServer } from './utils/metrics'; import { TokenMetadataSingleton } from './tokenMetadataSingleton'; import { UniV2PoolSingleton } from './uniV2PoolSingleton'; -const kafka = new Kafka({ - clientId: 'event-pipeline', - brokers: KAFKA_BROKERS, - ssl: KAFKA_SSL, - sasl: KAFKA_SSL - ? { - mechanism: 'plain', - username: KAFKA_AUTH_USER, - password: KAFKA_AUTH_PASSWORD, - } - : undefined, -}); +let producer: Producer | null = null; -const producer = kafka.producer(); +if (KAFKA_BROKERS.length > 0) { + const kafka = new Kafka({ + clientId: 'event-pipeline', + brokers: KAFKA_BROKERS, + ssl: KAFKA_SSL, + sasl: KAFKA_SSL + ? { + mechanism: 'plain', + username: KAFKA_AUTH_USER, + password: KAFKA_AUTH_PASSWORD, + } + : undefined, + }); + + producer = kafka.producer(); +} logger.info('App is running...'); @@ -68,7 +72,9 @@ chainIdChecker.checkChainId(CHAIN_ID); // run pull and save events createConnection(ormConfig as ConnectionOptions) .then(async (connection) => { - await producer.connect(); + if (producer) { + await producer.connect(); + } await TokenMetadataSingleton.getInstance(connection, producer); if (FEAT_UNISWAP_V2_PAIR_CREATED_EVENT) { await UniV2PoolSingleton.initInstance(connection); diff --git a/src/scripts/utils/event_abi_utils.ts b/src/scripts/utils/event_abi_utils.ts index 7b9ce8ff..7e898fed 100644 --- a/src/scripts/utils/event_abi_utils.ts +++ b/src/scripts/utils/event_abi_utils.ts @@ -10,7 +10,7 @@ import { TokenMetadataMap, extractTokensFromLogs, getParseSaveTokensAsync, getPa import { RawLogEntry } from 'ethereum-types'; -import { CHAIN_NAME_LOWER, MAX_BLOCKS_REORG, MAX_BLOCKS_TO_SEARCH, SCHEMA } from '../../config'; +import { CHAIN_NAME_LOWER, MAX_BLOCKS_REORG, MAX_BLOCKS_TO_SEARCH, RESCRAPE_BLOCKS, SCHEMA } from '../../config'; import { LastBlockProcessed } from '../../entities'; import { SCAN_END_BLOCK, RPC_LOGS_ERROR, SCAN_RESULTS, SCAN_START_BLOCK, SKIPPED_EVENTS } from '../../utils/metrics'; @@ -406,7 +406,7 @@ export const getStartBlockAsync = async ( }; } return { - startBlockNumber: lastKnownBlockNumber + 1, + startBlockNumber: lastKnownBlockNumber - (RESCRAPE_BLOCKS - 1), hasLatestBlockChanged: true, reorgLikely: false, }; diff --git a/src/tokenMetadataSingleton.ts b/src/tokenMetadataSingleton.ts index c34da2e3..a5013595 100644 --- a/src/tokenMetadataSingleton.ts +++ b/src/tokenMetadataSingleton.ts @@ -12,7 +12,7 @@ export class TokenMetadataSingleton { this.tokens = []; } - static async getInstance(connection: Connection, producer: Producer): Promise { + static async getInstance(connection: Connection, producer: Producer | null): Promise { if (!TokenMetadataSingleton.instance) { TokenMetadataSingleton.instance = new TokenMetadataSingleton(); const tmp = await connection diff --git a/src/utils/kafka_send.ts b/src/utils/kafka_send.ts index 87dfb5ee..4ed8d1a0 100644 --- a/src/utils/kafka_send.ts +++ b/src/utils/kafka_send.ts @@ -28,7 +28,7 @@ export interface CommandMessage { } export async function kafkaSendRawAsync( - producer: Producer, + producer: Producer | null, topic: string, keyFields: string[], payload: any[], @@ -38,33 +38,35 @@ export async function kafkaSendRawAsync( let currentSize = 0; let messages = []; - for (const message of payload) { - const jsonMessage = JSON.stringify(message); - const keyValues = keyFields.map((keyField) => String(message[keyField])); - const key = keyValues.join('-'); - const messageLength = jsonMessage.length; + if (producer !== null) { + for (const message of payload) { + const jsonMessage = JSON.stringify(message); + const keyValues = keyFields.map((keyField) => String(message[keyField])); + const key = keyValues.join('-'); + const messageLength = jsonMessage.length; - if (currentSize + messageLength >= MAX_SIZE) { - await producer.send({ - topic, - messages, - }); - currentSize = 0; - messages = []; + if (currentSize + messageLength >= MAX_SIZE) { + await producer.send({ + topic, + messages, + }); + currentSize = 0; + messages = []; + } + currentSize += messageLength; + messages.push({ key, value: jsonMessage }); } - currentSize += messageLength; - messages.push({ key, value: jsonMessage }); - } - await producer.send({ - topic, - messages, - }); + await producer.send({ + topic, + messages, + }); - logger.info(`Emitted ${payload.length} messages to ${topic}`); + logger.info(`Emitted ${payload.length} messages to ${topic}`); + } } export async function kafkaSendAsync( - producer: Producer, + producer: Producer | null, topic: string, keyFields: string[], payload: any[], @@ -72,11 +74,13 @@ export async function kafkaSendAsync( const dataPayload = payload.map((message) => { return { type: 'data', message }; }); - await kafkaSendRawAsync(producer, topic, keyFields, dataPayload); + if (producer != null) { + await kafkaSendRawAsync(producer, topic, keyFields, dataPayload); + } } export async function kafkaSendCommandAsync( - producer: Producer, + producer: Producer | null, topic: string, keyFields: string[], payload: CommandMessage[], @@ -84,5 +88,7 @@ export async function kafkaSendCommandAsync( const commandPayload = payload.map((message) => { return { type: 'command', message }; }); - await kafkaSendRawAsync(producer, topic, keyFields, commandPayload); + if (producer != null) { + await kafkaSendRawAsync(producer, topic, keyFields, commandPayload); + } }