Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Kafka optional to Scrapers #142

Merged
merged 2 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ services:
SCHEMA: 'events'
# FEAT_EXCLUSIVE_TOKENS_FROM_TRANSACTIONS: "true"
# TOKENS_FROM_TRANSACTIONS_START_BLOCK: 9193266
KAFKA_BROKERS: '${KAFKA_BROKERS}'
KAFKA_SSL: '${KAFKA_SSL}'
KAFKA_AUTH_USER: '${KAFKA_AUTH_USER}'
KAFKA_AUTH_PASSWORD: '${KAFKA_AUTH_PASSWORD}'
#KAFKA_BROKERS: '${KAFKA_BROKERS}'
#KAFKA_SSL: '${KAFKA_SSL}'
#KAFKA_AUTH_USER: '${KAFKA_AUTH_USER}'
#KAFKA_AUTH_PASSWORD: '${KAFKA_AUTH_PASSWORD}'
EP_DEPLOYMENT_BLOCK: 10247094
MAX_BLOCKS_TO_SEARCH: 1000
MAX_BLOCKS_TO_PULL: 1000
MAX_TX_TO_PULL: 1000
BLOCK_FINALITY_THRESHOLD: 0
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true"
UNISWAP_V2_VIP_SWAP_SOURCES: "UniswapV2,SushiSwap"
UNISWAP_V2_VIP_SWAP_START_BLOCK: 10917104
Expand Down Expand Up @@ -96,6 +97,7 @@ services:
MAX_BLOCKS_TO_SEARCH: 2000
MAX_BLOCKS_TO_PULL: 5000
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true"
UNISWAP_V2_VIP_SWAP_SOURCES: "PancakeSwap,BakerySwap,SushiSwap,CafeSwap,SwapLiquidity,ApeSwapFinance,CheeseSwap,Swap"
FEAT_UNISWAP_V2_PAIR_CREATED_EVENT: "true"
Expand Down Expand Up @@ -125,7 +127,8 @@ services:
EP_DEPLOYMENT_BLOCK: 14391480
MAX_BLOCKS_TO_SEARCH: 1000
MAX_BLOCKS_TO_PULL: 1000
MINUTES_BETWEEN_RUNS: 1
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_SLINGSHOT_TRADE_EVENT: "true"
SLINGSHOT_DEPLOYMENT_BLOCK: 14500000
FEAT_LIMIT_ORDERS: "true"
Expand Down Expand Up @@ -159,7 +162,10 @@ services:
EP_DEPLOYMENT_BLOCK: 3601700
MAX_BLOCKS_TO_SEARCH: 5000
MAX_BLOCKS_TO_PULL: 2000
MINUTES_BETWEEN_RUNS: 1
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true"
FLASHWALLET_ADDRESS: "0xdb6f1920a889355780af7570773609bd8cb1f498"
FLASHWALLET_DEPLOYMENT_BLOCK: 11805869
Expand Down Expand Up @@ -190,7 +196,8 @@ services:
EP_DEPLOYMENT_BLOCK: 18855765
MAX_BLOCKS_TO_SEARCH: 2000
MAX_BLOCKS_TO_PULL: 1000
SECONDS_BETWEEN_RUNS: 60
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true"
FLASHWALLET_ADDRESS: "0xb4d961671cadfed687e040b076eee29840c142e5"
FLASHWALLET_DEPLOYMENT_BLOCK: 18855797
Expand Down Expand Up @@ -219,7 +226,8 @@ services:
EP_DEPLOYMENT_BLOCK: 9350111
MAX_BLOCKS_TO_SEARCH: 5000
MAX_BLOCKS_TO_PULL: 2000
MINUTES_BETWEEN_RUNS: 1
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_NFT: "true"
NFT_FEATURE_START_BLOCK: 11820000

Expand All @@ -243,6 +251,7 @@ services:
MAX_BLOCKS_TO_SEARCH: 1000
MAX_BLOCKS_TO_PULL: 1000
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
EP_ADDRESS: "0xdef1abe32c034e558cdd535791643c58a13acc10"
FEAT_TRANSFORMED_ERC20_EVENT: "true"
FEAT_NFT: "true"
Expand Down Expand Up @@ -273,6 +282,7 @@ services:
MAX_BLOCKS_TO_SEARCH: 1000
MAX_BLOCKS_TO_PULL: 1000
SECONDS_BETWEEN_RUNS: 30
RESCRAPE_BLOCKS: 10
EP_ADDRESS: "0xdef1c0ded9bec7f1a1670819833240f027b25eff"
FEAT_NFT: "true"
NFT_FEATURE_START_BLOCK: 4050733
Expand All @@ -295,6 +305,7 @@ services:
MAX_BLOCKS_TO_SEARCH: 1000
MAX_BLOCKS_TO_PULL: 100
SECONDS_BETWEEN_RUNS: 1
RESCRAPE_BLOCKS: 10
FEAT_NFT: "true"
NFT_FEATURE_START_BLOCK: 1410394
KAFKA_BROKERS: '${KAFKA_BROKERS}'
Expand Down
17 changes: 11 additions & 6 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DEFAULT_FEAT_LIMIT_ORDERS,
DEFAULT_FEAT_META_TRANSACTION_EXECUTED_EVENT,
DEFAULT_FEAT_NFT,
DEFAULT_FEAT_ONCHAIN_GOVERNANCE,
DEFAULT_FEAT_OTC_ORDERS,
DEFAULT_FEAT_PLP_SWAP_EVENT,
DEFAULT_FEAT_POLYGON_RFQM_PAYMENTS,
Expand All @@ -19,26 +20,28 @@ import {
DEFAULT_FEAT_UNISWAP_V2_PAIR_CREATED_EVENT,
DEFAULT_FEAT_UNISWAP_V2_SYNC_EVENT,
DEFAULT_FEAT_UNISWAP_V2_VIP_SWAP_EVENT,
DEFAULT_FEAT_UNISWAP_V3_VIP_SWAP_EVENT,
DEFAULT_FEAT_UNISWAP_V3_SWAP_EVENT,
DEFAULT_FEAT_UNISWAP_V3_VIP_SWAP_EVENT,
DEFAULT_FEAT_V3_FILL_EVENT,
DEFAULT_FEAT_V3_NATIVE_FILL,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_EVENT,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_TRANSFER_EVENT,
DEFAULT_LOCAL_POSTGRES_URI,
DEFAULT_MAX_BLOCKS_REORG,
DEFAULT_MAX_BLOCKS_TO_PULL,
DEFAULT_MAX_BLOCKS_TO_SEARCH,
DEFAULT_MAX_BLOCKS_REORG,
DEFAULT_MAX_TIME_TO_SEARCH,
DEFAULT_MAX_TX_TO_PULL,
DEFAULT_METRICS_PATH,
DEFAULT_MINUTES_BETWEEN_RUNS,
DEFAULT_PROMETHEUS_PORT,
DEFAULT_RESCRAPE_BLOCKS,
DEFAULT_STAKING_POOLS_JSON_URL,
DEFAULT_STAKING_POOLS_METADATA_JSON_URL,
DEFAULT_FEAT_ONCHAIN_GOVERNANCE,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_EVENT,
DEFAULT_FEAT_WRAP_UNWRAP_NATIVE_TRANSFER_EVENT,
} from './constants';

import { logger } from './utils';

const throwError = (err: string) => {
throw new Error(err);
};
Expand Down Expand Up @@ -407,7 +410,7 @@ validateAddress(

export const KAFKA_BROKERS = process.env.KAFKA_BROKERS ? process.env.KAFKA_BROKERS.split(',') : [];
if (KAFKA_BROKERS.length === 0) {
throwError(`KAFKA_BROKERS is missing`);
logger.warn('KAFKA_BROKERS is empty, disabling kafka');
}
export const KAFKA_AUTH_USER = process.env.KAFKA_AUTH_USER!;
export const KAFKA_AUTH_PASSWORD = process.env.KAFKA_AUTH_PASSWORD!;
Expand All @@ -431,6 +434,8 @@ validateStartBlock(
FEAT_SOCKET_BRIDGE_EVENT,
);

export const RESCRAPE_BLOCKS = getIntConfig('RESCRAPE_BLOCKS', DEFAULT_RESCRAPE_BLOCKS);

function getBoolConfig(env: string, defaultValue: boolean): boolean {
if (Object.prototype.hasOwnProperty.call(process.env, env)) {
return process.env[env] === 'true';
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export const DEFAULT_MAX_BLOCKS_TO_PULL = 120;
export const DEFAULT_MAX_BLOCKS_TO_SEARCH = 120;
export const DEFAULT_MAX_TX_TO_PULL = 1000;
export const DEFAULT_BLOCK_FINALITY_THRESHOLD = 10;
export const DEFAULT_RESCRAPE_BLOCKS = 0;
export const DEFAULT_MINUTES_BETWEEN_RUNS = 3;
export const DEFAULT_STAKING_POOLS_JSON_URL =
'https://raw.githubusercontent.com/0xProject/0x-staking-pool-registry/master/staking_pools.json';
Expand Down
34 changes: 20 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@ import { startMetricsServer } from './utils/metrics';
import { TokenMetadataSingleton } from './tokenMetadataSingleton';
import { UniV2PoolSingleton } from './uniV2PoolSingleton';

const kafka = new Kafka({
clientId: 'event-pipeline',
brokers: KAFKA_BROKERS,
ssl: KAFKA_SSL,
sasl: KAFKA_SSL
? {
mechanism: 'plain',
username: KAFKA_AUTH_USER,
password: KAFKA_AUTH_PASSWORD,
}
: undefined,
});
let producer: Producer | null = null;

const producer = kafka.producer();
if (KAFKA_BROKERS.length > 0) {
const kafka = new Kafka({
clientId: 'event-pipeline',
brokers: KAFKA_BROKERS,
ssl: KAFKA_SSL,
sasl: KAFKA_SSL
? {
mechanism: 'plain',
username: KAFKA_AUTH_USER,
password: KAFKA_AUTH_PASSWORD,
}
: undefined,
});

producer = kafka.producer();
}

logger.info('App is running...');

Expand All @@ -68,7 +72,9 @@ chainIdChecker.checkChainId(CHAIN_ID);
// run pull and save events
createConnection(ormConfig as ConnectionOptions)
.then(async (connection) => {
await producer.connect();
if (producer) {
await producer.connect();
}
await TokenMetadataSingleton.getInstance(connection, producer);
if (FEAT_UNISWAP_V2_PAIR_CREATED_EVENT) {
await UniV2PoolSingleton.initInstance(connection);
Expand Down
4 changes: 2 additions & 2 deletions src/scripts/utils/event_abi_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { TokenMetadataMap, extractTokensFromLogs, getParseSaveTokensAsync, getPa

import { RawLogEntry } from 'ethereum-types';

import { CHAIN_NAME_LOWER, MAX_BLOCKS_REORG, MAX_BLOCKS_TO_SEARCH, SCHEMA } from '../../config';
import { CHAIN_NAME_LOWER, MAX_BLOCKS_REORG, MAX_BLOCKS_TO_SEARCH, RESCRAPE_BLOCKS, SCHEMA } from '../../config';
import { LastBlockProcessed } from '../../entities';
import { SCAN_END_BLOCK, RPC_LOGS_ERROR, SCAN_RESULTS, SCAN_START_BLOCK, SKIPPED_EVENTS } from '../../utils/metrics';

Expand Down Expand Up @@ -406,7 +406,7 @@ export const getStartBlockAsync = async (
};
}
return {
startBlockNumber: lastKnownBlockNumber + 1,
startBlockNumber: lastKnownBlockNumber - (RESCRAPE_BLOCKS - 1),
hasLatestBlockChanged: true,
reorgLikely: false,
};
Expand Down
2 changes: 1 addition & 1 deletion src/tokenMetadataSingleton.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class TokenMetadataSingleton {
this.tokens = [];
}

static async getInstance(connection: Connection, producer: Producer): Promise<TokenMetadataSingleton> {
static async getInstance(connection: Connection, producer: Producer | null): Promise<TokenMetadataSingleton> {
if (!TokenMetadataSingleton.instance) {
TokenMetadataSingleton.instance = new TokenMetadataSingleton();
const tmp = await connection
Expand Down
56 changes: 31 additions & 25 deletions src/utils/kafka_send.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export interface CommandMessage {
}

export async function kafkaSendRawAsync(
producer: Producer,
producer: Producer | null,
topic: string,
keyFields: string[],
payload: any[],
Expand All @@ -38,51 +38,57 @@ export async function kafkaSendRawAsync(
let currentSize = 0;
let messages = [];

for (const message of payload) {
const jsonMessage = JSON.stringify(message);
const keyValues = keyFields.map((keyField) => String(message[keyField]));
const key = keyValues.join('-');
const messageLength = jsonMessage.length;
if (producer !== null) {
for (const message of payload) {
const jsonMessage = JSON.stringify(message);
const keyValues = keyFields.map((keyField) => String(message[keyField]));
const key = keyValues.join('-');
const messageLength = jsonMessage.length;

if (currentSize + messageLength >= MAX_SIZE) {
await producer.send({
topic,
messages,
});
currentSize = 0;
messages = [];
if (currentSize + messageLength >= MAX_SIZE) {
await producer.send({
topic,
messages,
});
currentSize = 0;
messages = [];
}
currentSize += messageLength;
messages.push({ key, value: jsonMessage });
}
currentSize += messageLength;
messages.push({ key, value: jsonMessage });
}
await producer.send({
topic,
messages,
});
await producer.send({
topic,
messages,
});

logger.info(`Emitted ${payload.length} messages to ${topic}`);
logger.info(`Emitted ${payload.length} messages to ${topic}`);
}
}

export async function kafkaSendAsync(
producer: Producer,
producer: Producer | null,
topic: string,
keyFields: string[],
payload: any[],
): Promise<void> {
const dataPayload = payload.map((message) => {
return { type: 'data', message };
});
await kafkaSendRawAsync(producer, topic, keyFields, dataPayload);
if (producer != null) {
await kafkaSendRawAsync(producer, topic, keyFields, dataPayload);
}
}

export async function kafkaSendCommandAsync(
producer: Producer,
producer: Producer | null,
topic: string,
keyFields: string[],
payload: CommandMessage[],
): Promise<void> {
const commandPayload = payload.map((message) => {
return { type: 'command', message };
});
await kafkaSendRawAsync(producer, topic, keyFields, commandPayload);
if (producer != null) {
await kafkaSendRawAsync(producer, topic, keyFields, commandPayload);
}
}
Loading