diff --git a/.env b/.env index c92a312d..cf0599d1 100644 --- a/.env +++ b/.env @@ -5,6 +5,8 @@ NETWORK_ID=mainnet # Testnet - Taurus # NETWORK_ID=taurus +LOG_LEVEL=debug + # Mainnet NODE_DOCKER_TAG="mainnet-2024-nov-05" # Testnet - Taurus @@ -44,6 +46,10 @@ REDIS_PORT=6379 BULL_USERNAME=bull BULL_PASSWORD=board BULL_PORT=3020 +BULL_SESSION_SECRET=keyboardcat NR_API_KEY="" -NR_AGENT_IDENTIFIER="" \ No newline at end of file +NR_AGENT_IDENTIFIER="" + +SLACK_TOKEN="" +SLACK_CONVERSATION_ID="" \ No newline at end of file diff --git a/.vscode/astral.code-workspace b/.vscode/astral.code-workspace index bd3f8ce5..1ecce11a 100644 --- a/.vscode/astral.code-workspace +++ b/.vscode/astral.code-workspace @@ -29,8 +29,8 @@ "path": "../indexers/gemini-3h" }, { - "name": "indexer - Gemini 3G", - "path": "../indexers/gemini-3g" + "name": "Taskboard", + "path": "../indexers/taskboard" } ], "settings": { diff --git a/docker-compose.yml b/docker-compose.yml index f14239f7..81f7ba9a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -143,6 +143,7 @@ services: # Task Board Action Endpoint HASURA_GRAPHQL_ACTION_TASK_BOARD: http://taskboard:${BULL_PORT} + HASURA_GRAPHQL_ACTION_TASK_BOARD_SESSION_SECRET: ${BULL_SESSION_SECRET} ports: - "${HASURA_GRAPHQL_PORT}:8080" command: @@ -342,6 +343,7 @@ services: BULL_USERNAME: ${BULL_USERNAME} BULL_PASSWORD: ${BULL_PASSWORD} BULL_PORT: ${BULL_PORT} + BULL_SESSION_SECRET: ${BULL_SESSION_SECRET} REDIS_HOST: redis REDIS_PORT: ${REDIS_PORT} @@ -351,3 +353,9 @@ services: DB_DATABASE: ${DB_DATABASE} DB_HOST: ${DB_HOST} DB_PORT: ${DB_PORT} + + NETWORK_ID: ${NETWORK_ID} + LOG_LEVEL: ${LOG_LEVEL} + + SLACK_TOKEN: ${SLACK_TOKEN} + SLACK_CONVERSATION_ID: ${SLACK_CONVERSATION_ID} diff --git a/indexers/db/metadata/actions.graphql b/indexers/db/metadata/actions.graphql index fe8712f0..bf8e22a2 100644 --- a/indexers/db/metadata/actions.graphql +++ b/indexers/db/metadata/actions.graphql @@ -1,7 +1,9 @@ type Mutation { - updateAccount( - args: AddTaskUpdateAccountInput! - ): UpdateAccountOutput + slackNotification(args: SlackNotificationInput!): SlackNotificationOutput +} + +type Mutation { + updateAccount(args: UpdateAccountInput!): UpdateAccountOutput } input UpdateAccountInput { @@ -9,13 +11,18 @@ input UpdateAccountInput { accountId: String! } -input AddTaskUpdateAccountInput { - queueName: String! - data: UpdateAccountInput! - jobId: String! +input SlackNotificationInput { + title: String! + path: String + message: String + logData: String + messageId: String } type UpdateAccountOutput { blockNumber: String! } +type SlackNotificationOutput { + title: String! +} diff --git a/indexers/db/metadata/actions.yaml b/indexers/db/metadata/actions.yaml index e7937044..c07029ff 100644 --- a/indexers/db/metadata/actions.yaml +++ b/indexers/db/metadata/actions.yaml @@ -1,15 +1,32 @@ actions: + - name: slackNotification + definition: + kind: asynchronous + handler: "{{HASURA_GRAPHQL_ACTION_TASK_BOARD}}/add-task" + forward_client_headers: true + headers: + - name: taskboard_session_secret + value: "{{HASURA_GRAPHQL_ACTION_TASK_BOARD_SESSION_SECRET}}" + permissions: + - role: astral-api + comment: Send a slack notification - name: updateAccount definition: kind: asynchronous - handler: '{{HASURA_GRAPHQL_ACTION_TASK_BOARD}}/add-task' + handler: "{{HASURA_GRAPHQL_ACTION_TASK_BOARD}}/add-task" forward_client_headers: true + headers: + - name: taskboard_session_secret + value: "{{HASURA_GRAPHQL_ACTION_TASK_BOARD_SESSION_SECRET}}" + permissions: + - role: astral-api comment: Update account in db (will add the account if it was not indexed before) custom_types: enums: [] input_objects: - name: UpdateAccountInput - - name: AddTaskUpdateAccountInput + - name: SlackNotificationInput objects: - name: UpdateAccountOutput + - name: SlackNotificationOutput scalars: [] diff --git a/indexers/taskboard/Dockerfile b/indexers/taskboard/Dockerfile index 1c282cc7..50657b1a 100644 --- a/indexers/taskboard/Dockerfile +++ b/indexers/taskboard/Dockerfile @@ -32,7 +32,7 @@ USER node RUN yarn install # Expose the port the app runs on -EXPOSE 3000 +EXPOSE ${BULL_PORT} # Command to run the app -CMD ["yarn", "start"] \ No newline at end of file +CMD yarn start diff --git a/indexers/taskboard/index.js b/indexers/taskboard/index.js deleted file mode 100644 index f7fefcb0..00000000 --- a/indexers/taskboard/index.js +++ /dev/null @@ -1,187 +0,0 @@ -const express = require("express"); -const bodyParser = require("body-parser"); -const session = require("express-session"); -const passport = require("passport"); -const LocalStrategy = require("passport-local").Strategy; -const { createBullBoard } = require("@bull-board/api"); -const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter"); -const { ExpressAdapter } = require("@bull-board/express"); -const { ensureLoggedIn } = require("connect-ensure-login"); -const tasks = require("./tasks"); -const { checkRedisConnection } = require("./utils/store"); -const { - NETWORKS, - QUEUES, - JOB_RETENTION_HOURS, - GARBAGE_COLLECTION_INTERVAL, - BULL_BOARD_OPTIONS, -} = require("./constants"); -const { - createQueueMQ, - setupBullMQProcessor, - cleanOldJobs, -} = require("./utils/bull"); - -passport.use( - new LocalStrategy((username, password, cb) => { - if ( - username === process.env.BULL_USERNAME && - password === process.env.BULL_PASSWORD - ) { - return cb(null, { user: "bull-board" }); - } - return cb(null, false); - }) -); - -passport.serializeUser((user, cb) => cb(null, user)); -passport.deserializeUser((user, cb) => cb(null, user)); - -const run = async () => { - await checkRedisConnection(); - const app = express(); - app.set("views", `${__dirname}/views`); - app.set("view engine", "ejs"); - - const activeQueues = QUEUES.filter((q) => q.enabled); - - const queues = {}; - const serverAdapters = {}; - - for (const network of NETWORKS) { - for (const queue of activeQueues) { - const queueName = `${network} - ${queue.title}`; - const _queue = createQueueMQ(queueName); - const task = tasks[queue.name]; - if (task) { - await setupBullMQProcessor(_queue.name, task); - } - if (!queues[network]) { - queues[network] = {}; - } - queues[network][queue.name] = _queue; - } - - const serverAdapter = new ExpressAdapter(); - createBullBoard({ - queues: activeQueues.map( - (queue) => new BullMQAdapter(queues[network][queue.name]) - ), - serverAdapter, - options: BULL_BOARD_OPTIONS, - }); - - serverAdapter.setBasePath(`/${network}`); - app.use(`/${network}`, serverAdapter.getRouter()); - serverAdapters[network] = serverAdapter; - } - - for (const network of NETWORKS) { - app.use(`/${network}/add`, async (req, res) => { - const opts = req.query.opts || {}; - const jobId = req.query.jobId; - - if (!jobId) { - return res.status(400).json({ error: "jobId is required" }); - } - - const queue = queues[network][activeQueues[0].name]; - const existingJob = await queue.getJob(jobId); - - if ( - existingJob && - !existingJob.isCompleted() && - !existingJob.isFailed() - ) { - return res.status(400).json({ error: "Job is already in progress" }); - } - - await queue.add( - `Add ${network}`, - { title: req.query.title }, - { ...opts, jobId } - ); - - res.json({ ok: true }); - }); - } - - app.use(bodyParser.json()); - app.post("/add-task", async (req, res) => { - let { queueName, taskName, data, opts, jobId } = req.body; - if (req.body.action) { - taskName = req.body.action.name; - ({ queueName, data, opts, jobId } = req.body.input.args); - } - console.log("jobId: ", jobId); - - if (!jobId) { - console.log("jobId is required"); - return res.status(400).json({ error: "jobId is required" }); - } - - try { - const queue = queues[queueName]; - if (!queue) return res.status(400).json({ error: "Invalid queue name" }); - - const existingTaskQueue = queue[taskName]; - if (!existingTaskQueue) { - console.log("Invalid task name"); - return res.status(400).json({ error: "Invalid task name" }); - } - const existingJob = await existingTaskQueue.getJob(jobId); - - if ( - existingJob && - !existingJob.isCompleted() && - !existingJob.isFailed() - ) { - console.log("Job is already in progress"); - return res.status(400).json({ error: "Job is already in progress" }); - } - - const job = await existingTaskQueue.add(taskName, data, { - ...opts, - jobId, - }); - if (!job) { - console.log("Failed to create task"); - return res.status(500).json({ error: "Failed to create task" }); - } - res.json({ - ok: true, - message: `Task added to ${queueName}`, - jobId: job.id, - }); - } catch (error) { - console.log("Error adding task:", error); - res.status(500).json({ error: error.message }); - } - }); - - // Schedule garbage collection to run every hour - setInterval(async () => { - for (const network in queues) { - for (const queueName in queues[network]) { - await cleanOldJobs(queues[network][queueName], JOB_RETENTION_HOURS); - } - } - }, GARBAGE_COLLECTION_INTERVAL); - - app.listen(process.env.BULL_PORT || 3020, () => { - console.log(`Running on ${process.env.BULL_PORT}...`); - NETWORKS.forEach((network) => { - console.log( - `For the UI of ${network}, open http://localhost:${process.env.BULL_PORT}/${network}` - ); - }); - console.log("Make sure Redis is running on port 6379 by default"); - console.log("To populate the queue, run:"); - console.log( - "To add a task to the queue, send a POST request to //add-task with JSON body:" - ); - }); -}; - -// eslint-disable-next-line no-console -run().catch((e) => console.error(e)); diff --git a/indexers/taskboard/package.json b/indexers/taskboard/package.json index 626920aa..43aba23e 100644 --- a/indexers/taskboard/package.json +++ b/indexers/taskboard/package.json @@ -4,9 +4,10 @@ "description": "A taskboard using Express and BullMQ to manage tasks", "private": true, "license": "MIT", - "main": "index.js", + "main": "src/index.ts", "scripts": { - "start": "node index.js" + "start": "ts-node src/index.ts", + "dev": "export $(grep -v '^#' ../../.env | xargs) && export $(grep -v '^#' ../../.env.dev | xargs) && ts-node-dev src/index.ts" }, "dependencies": { "@autonomys/auto-consensus": "^1.0.1", @@ -15,7 +16,7 @@ "@bull-board/express": "^5.23.0", "body-parser": "^1.20.0", "bullmq": "^4.6.0", - "connect-ensure-login": "^0.1.1", + "connect-redis": "^7.1.1", "express": "^4.21.0", "express-session": "^1.17.2", "ioredis": "^5.4.1", @@ -23,6 +24,15 @@ "passport-local": "^1.0.0", "pg": "^8.13.0" }, + "devDependencies": { + "@types/bull": "^4.10.4", + "@types/express": "^5.0.0", + "@types/express-session": "^1.18.0", + "@types/node": "^22.9.0", + "ts-node": "^10.9.2", + "ts-node-dev": "^2.0.0", + "typescript": "^5.2.2" + }, "workspaces": { "nohoist": [ "**" diff --git a/indexers/taskboard/constants/index.js b/indexers/taskboard/src/constants/index.ts similarity index 74% rename from indexers/taskboard/constants/index.js rename to indexers/taskboard/src/constants/index.ts index da0fde03..b68a2233 100644 --- a/indexers/taskboard/constants/index.js +++ b/indexers/taskboard/src/constants/index.ts @@ -1,26 +1,49 @@ -const NETWORKS = ["mainnet", "taurus"]; -const QUEUES = [ +const NETWORK = process.env.NETWORK_ID || "mainnet"; +const SLACK = "slack"; +export const QUEUES = [NETWORK, SLACK]; +export const TASKS_QUEUES = [ { + queue: NETWORK, name: "consensusUniqueRowsMapping", title: "Consensus Unique Rows Mapping", enabled: true, }, { + queue: NETWORK, name: "leaderboardSortAndRank", title: "Leaderboard Sort and Rank", enabled: true, }, { + queue: NETWORK, name: "updateAccount", title: "Update Account", enabled: true, }, + { + queue: SLACK, + name: "slackNotification", + title: "Slack Notification", + enabled: true, + }, ]; -const JOB_RETENTION_HOURS = 24; -const GARBAGE_COLLECTION_INTERVAL = 60 * 60 * 1000; // 1 hour in milliseconds +export enum ROUTES { + LOGIN = "/", + DASHBOARD = "/dashboard", + + POST_LOGIN = "/login", + POST_ADD_TASK = "/add-task", +} +export enum VIEWS { + LOGIN = "login", + DASHBOARD = "index", +} -const BULL_BOARD_OPTIONS = { +export const JOB_RETENTION_HOURS = 24; +export const GARBAGE_COLLECTION_INTERVAL = 60 * 60 * 1000; // 1 hour in milliseconds + +export const BULL_BOARD_OPTIONS = { uiConfig: { boardTitle: "Indexers Tasks", boardLogo: { @@ -29,7 +52,7 @@ const BULL_BOARD_OPTIONS = { }, }; -const LEADERBOARD_ENTRY_TYPE = { +export const LEADERBOARD_ENTRY_TYPE = { ACCOUNT_TRANSFER_SENDER_TOTAL_COUNT: "AccountTransferSenderTotalCount", ACCOUNT_TRANSFER_SENDER_TOTAL_VALUE: "AccountTransferSenderTotalValue", ACCOUNT_TRANSFER_RECEIVER_TOTAL_COUNT: "AccountTransferReceiverTotalCount", @@ -56,12 +79,3 @@ const LEADERBOARD_ENTRY_TYPE = { NOMINATOR_DEPOSITS_TOTAL_VALUE: "NominatorDepositsTotalValue", NOMINATOR_WITHDRAWALS_TOTAL_COUNT: "NominatorWithdrawalsTotalCount", }; - -module.exports = { - NETWORKS, - QUEUES, - JOB_RETENTION_HOURS, - GARBAGE_COLLECTION_INTERVAL, - BULL_BOARD_OPTIONS, - LEADERBOARD_ENTRY_TYPE, -}; diff --git a/indexers/taskboard/src/index.ts b/indexers/taskboard/src/index.ts new file mode 100644 index 00000000..3cb11770 --- /dev/null +++ b/indexers/taskboard/src/index.ts @@ -0,0 +1,207 @@ +import { createBullBoard } from "@bull-board/api"; +import { BullMQAdapter } from "@bull-board/api/bullMQAdapter"; +import { ExpressAdapter } from "@bull-board/express"; +import bodyParser from "body-parser"; +import RedisStore from "connect-redis"; +import express, { Express } from "express"; +import session from "express-session"; +import Redis from "ioredis"; +import passport from "passport"; +import LocalStrategy from "passport-local"; +import { + BULL_BOARD_OPTIONS, + GARBAGE_COLLECTION_INTERVAL, + JOB_RETENTION_HOURS, + QUEUES, + ROUTES, + TASKS_QUEUES, + VIEWS, +} from "./constants"; +import { tasks } from "./tasks"; +import { + cleanOldJobs, + createQueueMQ, + setupBullMQProcessor, +} from "./utils/bull"; +import { log, returnError } from "./utils/helper"; +import { checkRedisConnection, connection } from "./utils/store"; + +declare module "express-session" { + interface SessionData { + authenticated: boolean; + } +} + +const SESSION_SECRET = process.env.BULL_SESSION_SECRET || "keyboard cat"; + +passport.use( + "api", + new LocalStrategy.Strategy( + { usernameField: "username", passwordField: "password" }, + (username, password, cb) => { + if ( + username === process.env.BULL_USERNAME && + password === process.env.BULL_PASSWORD + ) + return cb(null, { user: "bull-board" }); + return cb(null, false); + } + ) +); + +passport.serializeUser((user, cb) => cb(null, user)); +passport.deserializeUser((user, cb) => cb(null, user)); + +const run = async () => { + await checkRedisConnection(); + const app: Express = express(); + const RedisClient = new Redis(connection); + + const tasksQueues = {}; + const serverAdapters = {}; + + app.use( + session({ + secret: SESSION_SECRET, + resave: false, + saveUninitialized: false, + store: new RedisStore({ client: RedisClient }), + }) + ); + + app.use(passport.initialize()); + app.use(passport.session()); + app.use(bodyParser.json()); + + app.set("views", `${__dirname}/views`); + app.set("view engine", "ejs"); + + app.get(ROUTES.LOGIN, (req, res) => { + res.render("login", { invalid: false }); + }); + app.get(ROUTES.DASHBOARD, (req, res) => { + if (req.session.authenticated) + res.render(VIEWS.DASHBOARD, { queues: QUEUES }); + else res.redirect(ROUTES.LOGIN); + }); + + app.post( + ROUTES.POST_LOGIN, + express.urlencoded({ extended: true }), + (req, res) => { + const { username, password } = req.body; + if ( + username === process.env.BULL_USERNAME && + password === process.env.BULL_PASSWORD + ) { + req.session.authenticated = true; + res.redirect(ROUTES.DASHBOARD); + } else res.render(VIEWS.LOGIN, { invalid: true }); + } + ); + + for (const queue of QUEUES) { + const activeTasksQueues = TASKS_QUEUES.filter( + (q) => q.queue === queue && q.enabled + ); + + for (const tasksQueue of activeTasksQueues) { + const queueName = `${queue} - ${tasksQueue.title}`; + const _queue = createQueueMQ(queueName); + const task = tasks[tasksQueue.name]; + if (task) await setupBullMQProcessor(_queue.name, task); + if (!tasksQueues[queue]) { + tasksQueues[queue] = {}; + } + tasksQueues[queue][tasksQueue.name] = _queue; + } + + const serverAdapter = new ExpressAdapter(); + createBullBoard({ + queues: activeTasksQueues.map( + (q) => new BullMQAdapter(tasksQueues[queue][q.name]) + ), + serverAdapter, + options: BULL_BOARD_OPTIONS, + }); + + serverAdapter.setBasePath(`/${queue}`); + + app.use(`/${queue}`, (req, res) => { + if (req.session.authenticated) serverAdapter.getRouter()(req, res); + else res.redirect(ROUTES.LOGIN); + }); + serverAdapters[queue] = serverAdapter; + } + + app.post(ROUTES.POST_ADD_TASK, async (req, res) => { + log("req.body: ", req.body); + let { queueName, taskName, data, opts, jobId } = req.body; + + // Handle Hasura action + if (req.body.action) { + if (req.headers.taskboard_session_secret !== SESSION_SECRET) + returnError(res, "Invalid session secret"); + const requestId = req.headers["x-request-id"]; + if (!requestId) returnError(res, "Request ID is required"); + + log("Hasura action: ", req.body.action); + const matchingTask = TASKS_QUEUES.find( + (t) => t.name === req.body.action.name + ); + if (!matchingTask) returnError(res, "Invalid task name"); + + // Infer queue, task name and jobId from action request + queueName = matchingTask.queue; + taskName = matchingTask.name; + jobId = `${queueName}:${taskName}:hasura:${requestId}`; + data = req.body.input.args; + } + + console.log("jobId: ", jobId); + if (!jobId) returnError(res, "jobId is required"); + + try { + const tasksQueue = tasksQueues[queueName]; + if (!tasksQueue) returnError(res, "Invalid queue name"); + + const existingTaskQueue = tasksQueue[taskName]; + if (!existingTaskQueue) returnError(res, "Invalid task name"); + const existingJob = await existingTaskQueue.getJob(jobId); + + if (existingJob && !existingJob.isCompleted() && !existingJob.isFailed()) + returnError(res, "Job is already in progress"); + + const job = await existingTaskQueue.add(taskName, data, { + ...opts, + jobId, + }); + if (!job) returnError(res, "Failed to create task", 500); + + res.send({ + ok: true, + message: `Task added to ${queueName}`, + jobId: job.id, + }); + } catch (error) { + log("Error adding task:", error); + returnError(res, error.message, 500); + } + }); + + // Schedule garbage collection to run every hour + setInterval(async () => { + for (const queue in tasksQueues) { + for (const queueName in tasksQueues[queue]) { + await cleanOldJobs(tasksQueues[queue][queueName], JOB_RETENTION_HOURS); + } + } + }, GARBAGE_COLLECTION_INTERVAL); + + app.listen(process.env.BULL_PORT || 3020, () => { + console.log(`Running on port ${process.env.BULL_PORT}...`); + }); +}; + +// eslint-disable-next-line no-console +run().catch((e) => console.error(e)); diff --git a/indexers/taskboard/tasks/consensus.js b/indexers/taskboard/src/tasks/consensus.ts similarity index 77% rename from indexers/taskboard/tasks/consensus.js rename to indexers/taskboard/src/tasks/consensus.ts index 898e44e4..8a14bc3d 100644 --- a/indexers/taskboard/tasks/consensus.js +++ b/indexers/taskboard/src/tasks/consensus.ts @@ -1,17 +1,31 @@ -const { connectToDB, queries } = require("../utils/db"); +import { Job } from "bull"; +import { Pool, PoolClient } from "pg"; +import { connectToDB, queries } from "../utils/db"; -async function consensusUniqueRowsMapping(job) { +interface ConsensusResult { + blockNumber: number; + updatedTables: string[]; + query: string[]; +} + +interface JobData { + blockNumber: number; +} + +export const consensusUniqueRowsMapping = async ( + job: Job +): Promise => { const { blockNumber } = job.data; - const pool = await connectToDB(); + const pool: Pool = await connectToDB(); - const result = { + const result: ConsensusResult = { blockNumber, updatedTables: [], query: [], }; try { - const client = await pool.connect(); + const client: PoolClient = await pool.connect(); // To-Do: Implement the logic to update the consensus tables // Tables: @@ -59,7 +73,7 @@ async function consensusUniqueRowsMapping(job) { } catch (err) { await client.query("ROLLBACK"); console.error("Error updating consensus tables:", err); - throw new Error("Failed to update consensus tables: " + err); + throw new Error(`Failed to update consensus tables: ${err}`); } finally { client.release(); } @@ -67,10 +81,6 @@ async function consensusUniqueRowsMapping(job) { return result; } catch (err) { console.error("Error in consensus:", err); - throw new Error("Failed to update consensus tables: " + err); + throw new Error(`Failed to update consensus tables: ${err}`); } -} - -module.exports = { - consensusUniqueRowsMapping, }; diff --git a/indexers/taskboard/src/tasks/index.ts b/indexers/taskboard/src/tasks/index.ts new file mode 100644 index 00000000..2582941e --- /dev/null +++ b/indexers/taskboard/src/tasks/index.ts @@ -0,0 +1,32 @@ +import { consensusUniqueRowsMapping } from "./consensus"; +import { leaderboardSortAndRank } from "./leaderboardSortAndRank"; +import { slackNotification } from "./slackNotification"; +import { updateAccount } from "./updateAccount"; + +interface Task { + handler: (...args: any[]) => Promise; + concurrency: number; +} + +interface Tasks { + [key: string]: Task; +} + +export const tasks: Tasks = { + consensusUniqueRowsMapping: { + handler: consensusUniqueRowsMapping, + concurrency: 1, + }, + leaderboardSortAndRank: { + handler: leaderboardSortAndRank, + concurrency: 1, + }, + updateAccount: { + handler: updateAccount, + concurrency: 1, + }, + slackNotification: { + handler: slackNotification, + concurrency: 1, + }, +}; diff --git a/indexers/taskboard/tasks/leaderboardSortAndRank.js b/indexers/taskboard/src/tasks/leaderboardSortAndRank.ts similarity index 57% rename from indexers/taskboard/tasks/leaderboardSortAndRank.js rename to indexers/taskboard/src/tasks/leaderboardSortAndRank.ts index 9106a1ed..dbeabb2e 100644 --- a/indexers/taskboard/tasks/leaderboardSortAndRank.js +++ b/indexers/taskboard/src/tasks/leaderboardSortAndRank.ts @@ -1,18 +1,38 @@ -const { connectToDB, queries, entryTypeToTable } = require("../utils/db"); -const { LEADERBOARD_ENTRY_TYPE } = require("../constants"); +import { Pool, PoolClient } from "pg"; +import { LEADERBOARD_ENTRY_TYPE } from "../constants"; +import { connectToDB, entryTypeToTable, queries } from "../utils/db"; -async function leaderboardSortAndRank(job) { +interface Job { + data: { + blockNumber: number; + }; +} + +interface UpdatedTable { + table: string; + rowCount: number; +} + +interface LeaderboardResult { + blockNumber: number; + updatedTables: UpdatedTable[]; + query: string[]; +} + +export const leaderboardSortAndRank = async ( + job: Job +): Promise => { const { blockNumber } = job.data; - const pool = await connectToDB(); + const pool: Pool = await connectToDB(); - const result = { + const result: LeaderboardResult = { blockNumber, updatedTables: [], query: [], }; try { - const client = await pool.connect(); + const client: PoolClient = await pool.connect(); try { await client.query("BEGIN"); @@ -35,7 +55,7 @@ async function leaderboardSortAndRank(job) { } catch (err) { await client.query("ROLLBACK"); console.error("Error updating rankings:", err); - throw new Error("Failed to update rankings: " + err); + throw new Error(`Failed to update rankings: ${err}`); } finally { client.release(); } @@ -43,10 +63,6 @@ async function leaderboardSortAndRank(job) { return result; } catch (err) { console.error("Error in leaderboardSortAndRank:", err); - throw new Error("Failed to sort and rank leaderboard: " + err); + throw new Error(`Failed to sort and rank leaderboard: ${err}`); } -} - -module.exports = { - leaderboardSortAndRank, }; diff --git a/indexers/taskboard/src/tasks/slackNotification.ts b/indexers/taskboard/src/tasks/slackNotification.ts new file mode 100644 index 00000000..16655367 --- /dev/null +++ b/indexers/taskboard/src/tasks/slackNotification.ts @@ -0,0 +1,80 @@ +import { sendSlackMessage } from "../utils/slack"; + +interface JobData { + title: string; + path?: string; + message?: string; + logData?: any; + messageId?: string; +} + +interface Job { + data: JobData; +} + +interface NotificationResult extends JobData { + slackMessage?: string; +} + +export const slackNotification = async ( + job: Job +): Promise => { + const { title, path, message, logData, messageId } = job.data; + let result: NotificationResult = { + title, + path, + message, + logData, + messageId, + }; + + try { + const blocks = [ + { + type: "header", + text: { + type: "plain_text", + text: title, + }, + }, + ]; + if (path) { + blocks.push({ + type: "section", + text: { + type: "mrkdwn", + text: `Path: ${path}`, + }, + }); + } + if (message) { + blocks.push({ + type: "section", + text: { + type: "mrkdwn", + text: message, + }, + }); + } + if (logData) { + blocks.push({ + type: "section", + text: { + type: "mrkdwn", + text: `\`\`\`${JSON.stringify(logData, null, 2).slice( + 0, + 25000 + )}\`\`\``, + }, + }); + } + + const slackMessage = await sendSlackMessage(title, blocks, messageId); + result.slackMessage = slackMessage; + + return result; + } catch (err) { + console.error("Error in slackNotification:", err); + throw new Error(`Failed to send slack notification: ${err}`); + } +}; diff --git a/indexers/taskboard/tasks/updateAccount.js b/indexers/taskboard/src/tasks/updateAccount.ts similarity index 54% rename from indexers/taskboard/tasks/updateAccount.js rename to indexers/taskboard/src/tasks/updateAccount.ts index a00fc4af..84ed0717 100644 --- a/indexers/taskboard/tasks/updateAccount.js +++ b/indexers/taskboard/src/tasks/updateAccount.ts @@ -1,25 +1,41 @@ -const { connectToDB, queries } = require("../utils/db"); -const { activate } = require("@autonomys/auto-utils"); -const { account, blockNumber } = require("@autonomys/auto-consensus"); +import { account, blockNumber } from "@autonomys/auto-consensus"; +import { activate, stringify } from "@autonomys/auto-utils"; +import { ApiPromise } from "@polkadot/api"; +import { Pool, PoolClient } from "pg"; +import { connectToDB, queries } from "../utils/db"; -async function updateAccount(job) { +interface Job { + data: { + networkId: string; + accountId: string; + }; +} + +interface UpdateResult { + blockNumber: typeof blockNumber; + updatedTables: string[]; + accountResult: any[]; +} + +export const updateAccount = async (job: Job): Promise => { const { networkId, accountId } = job.data; - const result = { + const result: UpdateResult = { blockNumber, updatedTables: [], - query: [], + accountResult: [], }; try { - const api = await activate({ networkId }); - const pool = await connectToDB(); + const api: ApiPromise = await activate({ networkId }); + const pool: Pool = await connectToDB(); const [accountState, currentBlockNumber] = await Promise.all([ account(api, accountId), blockNumber(api), ]); + result.accountResult.push(stringify({ accountId, accountState })); - const client = await pool.connect(); + const client: PoolClient = await pool.connect(); try { await client.query("BEGIN"); // Execute queries @@ -42,17 +58,13 @@ async function updateAccount(job) { } catch (err) { await client.query("ROLLBACK"); console.error("Error updating account balance:", err); - throw new Error("Failed to update account balance: " + err); + throw new Error(`Failed to update account balance: ${err}`); } finally { client.release(); } return result; } catch (err) { console.error("Error in updateAccountBalance:", err); - throw new Error("Failed to update account balance: " + err); + throw new Error(`Failed to update account balance: ${err}`); } -} - -module.exports = { - updateAccount, }; diff --git a/indexers/taskboard/src/types/ejs.d.ts b/indexers/taskboard/src/types/ejs.d.ts new file mode 100644 index 00000000..67ff5e58 --- /dev/null +++ b/indexers/taskboard/src/types/ejs.d.ts @@ -0,0 +1,4 @@ +declare module "*.ejs" { + const content: string; + export default content; +} diff --git a/indexers/taskboard/utils/bull.js b/indexers/taskboard/src/utils/bull.ts similarity index 50% rename from indexers/taskboard/utils/bull.js rename to indexers/taskboard/src/utils/bull.ts index 1d5b892e..b4ad7bec 100644 --- a/indexers/taskboard/utils/bull.js +++ b/indexers/taskboard/src/utils/bull.ts @@ -1,22 +1,34 @@ -const { Queue: QueueMQ, Worker } = require("bullmq"); -const { connection } = require("./store"); +import { Job, Queue as QueueMQ, Worker } from "bullmq"; +import { connection } from "./store"; -const createQueueMQ = (name) => new QueueMQ(name, { connection }); +interface Task { + handler: (job: Job) => Promise; + concurrency: number; +} + +export const createQueueMQ = (name: string): QueueMQ => + new QueueMQ(name, { connection }); -async function setupBullMQProcessor(queueName, task) { +export const setupBullMQProcessor = async ( + queueName: string, + task: Task +): Promise => { new Worker(queueName, task.handler, { connection, concurrency: task.concurrency, }) - .on("completed", (job) => { + .on("completed", (job: Job) => { console.log(`Job ${job.id} has been completed`); }) - .on("failed", (job, err) => { + .on("failed", (job: Job, err: Error) => { console.error(`Job ${job.id} has failed with error ${err.message}`); }); -} +}; -async function cleanOldJobs(queue, hours) { +export const cleanOldJobs = async ( + queue: QueueMQ, + hours: number +): Promise => { const threshold = Date.now() - hours * 60 * 60 * 1000; const jobs = await queue.getJobs(["completed", "failed"]); for (const job of jobs) { @@ -25,10 +37,4 @@ async function cleanOldJobs(queue, hours) { console.log(`Removed job ${job.id} from queue ${queue.name}`); } } -} - -module.exports = { - createQueueMQ, - setupBullMQProcessor, - cleanOldJobs, }; diff --git a/indexers/taskboard/src/utils/db.ts b/indexers/taskboard/src/utils/db.ts new file mode 100644 index 00000000..8266e503 --- /dev/null +++ b/indexers/taskboard/src/utils/db.ts @@ -0,0 +1,142 @@ +import { Pool, PoolConfig } from "pg"; + +export const connectToDB = async (): Promise => { + const dbConfig: PoolConfig = { + user: process.env.DB_USER || "postgres", + host: process.env.DB_HOST || "localhost", + database: process.env.DB_DATABASE || "postgres", + password: process.env.DB_PASSWORD || "postgres", + port: Number(process.env.DB_PORT) || 5432, + }; + + const pool = new Pool(dbConfig); + return pool; +}; + +export const entryTypeToTable = (entryType: string): string => + entryType + .replace(/([A-Z])/g, "_$1") + .toLowerCase() + .replace(/^_/, "") + "s"; + +const consensusSectionsQuery = ` + INSERT INTO consensus.sections (id, _id, _block_range) + SELECT DISTINCT section as id, + gen_random_uuid() as _id, + int8range($1::int8, $1::int8) as _block_range + FROM ( + SELECT section FROM consensus.extrinsics WHERE _block_range @> $1::int8 + UNION + SELECT section FROM consensus.events WHERE _block_range @> $1::int8 + ) combined_sections + ON CONFLICT (id) DO NOTHING + RETURNING *`; + +// Get unique extrinsic modules +const consensusExtrinsicModulesQuery = ` + INSERT INTO consensus.extrinsic_modules (id, _id, section, method, _block_range) + SELECT DISTINCT + LOWER(name) as id, + gen_random_uuid() as _id, + section, + module as method, + int8range($1::int8, $1::int8) as _block_range + FROM consensus.extrinsics + WHERE _block_range @> $1::int8 + ON CONFLICT (id) DO NOTHING + RETURNING *`; + +// Get unique event modules +const consensusEventModulesQuery = ` + INSERT INTO consensus.event_modules (id, _id, section, method, _block_range) + SELECT DISTINCT + LOWER(name) as id, + gen_random_uuid() as _id, + section, + module as method, + int8range($1::int8, $1::int8) as _block_range + FROM consensus.events + WHERE _block_range @> $1::int8 + ON CONFLICT (id) DO NOTHING + RETURNING *`; + +// Get unique log kinds +const consensusLogKindsQuery = ` + INSERT INTO consensus.log_kinds (id, _id, _block_range) + SELECT DISTINCT kind as id, + gen_random_uuid() as _id, + int8range($1::int8, $1::int8) as _block_range + FROM consensus.logs + WHERE _block_range @> $1::int8 + ON CONFLICT (id) DO NOTHING + RETURNING *`; + +// Update or insert accounts +const consensusAccountsQuery = ` + INSERT INTO consensus.accounts (id, _id, nonce, free, reserved, total, created_at, updated_at, _block_range) + SELECT DISTINCT ON (id) + id, + gen_random_uuid() as _id, + nonce, + free, + reserved, + total, + created_at, + updated_at, + int8range($1::int8, $1::int8) as _block_range + FROM consensus.account_histories + WHERE _block_range @> $1::int8 + ON CONFLICT (id) + DO UPDATE SET + nonce = EXCLUDED.nonce, + free = EXCLUDED.free, + reserved = EXCLUDED.reserved, + total = EXCLUDED.total, + created_at = EXCLUDED.created_at, + updated_at = EXCLUDED.updated_at + RETURNING *`; + +// Get unique sections from both extrinsics and events +const updateLeaderboardRanking = (table) => ` + WITH ranked_entries AS ( + SELECT id, + RANK() OVER (ORDER BY value DESC) AS new_rank + FROM leaderboard.${table} + ) + UPDATE leaderboard.${table} t + SET rank = r.new_rank + FROM ranked_entries r + WHERE t.id = r.id; + `; + +const consensusUpsertAccountQuery = ` + INSERT INTO consensus.accounts (id, _id, nonce, free, reserved, total, created_at, updated_at, _block_range) + VALUES ($1, gen_random_uuid(), $2, $3, $4, $5, $6, $6, int8range($7::int8, $7::int8)) + ON CONFLICT (id) + DO UPDATE SET + nonce = $2, + free = $3, + reserved = $4, + total = $5, + updated_at = $6 + RETURNING *`; + +interface Queries { + consensusSectionsQuery: string; + consensusExtrinsicModulesQuery: string; + consensusEventModulesQuery: string; + consensusLogKindsQuery: string; + consensusAccountsQuery: string; + updateLeaderboardRanking: (table: string) => string; + consensusUpsertAccountQuery: string; +} + +export const queries: Queries = { + consensusSectionsQuery, + consensusExtrinsicModulesQuery, + consensusEventModulesQuery, + consensusLogKindsQuery, + consensusAccountsQuery, + updateLeaderboardRanking, + consensusUpsertAccountQuery, +}; diff --git a/indexers/taskboard/src/utils/helper.ts b/indexers/taskboard/src/utils/helper.ts new file mode 100644 index 00000000..03dd7545 --- /dev/null +++ b/indexers/taskboard/src/utils/helper.ts @@ -0,0 +1,17 @@ +import type { Response } from "express"; + +export const log = (...args: any[]): void => { + if (process.env.LOG_LEVEL === "debug") console.log(...args); +}; + +export const returnError = ( + res: Response, + error: string, + statusCode: number = 400 +) => { + log("Error: ", error); + return res.send({ + statusCode, + error, + }); +}; diff --git a/indexers/taskboard/src/utils/slack.ts b/indexers/taskboard/src/utils/slack.ts new file mode 100644 index 00000000..60542d5a --- /dev/null +++ b/indexers/taskboard/src/utils/slack.ts @@ -0,0 +1,64 @@ +interface SlackResponse { + ok: boolean; + error?: string; + ts?: string; +} + +interface SlackBlock { + type: string; + text: { + type: string; + text: string; + }; +} + +interface SlackPayload { + channel: string; + text: string; + blocks: SlackBlock[]; + ts?: string; +} + +export const sendSlackMessage = async ( + message: string, + blocks: any[], + messageIdToEdit?: string +): Promise => { + const token = process.env.SLACK_TOKEN; + const conversationId = process.env.SLACK_CONVERSATION_ID || ""; + const url = messageIdToEdit + ? "https://slack.com/api/chat.update" + : "https://slack.com/api/chat.postMessage"; + + let payload: SlackPayload = { + channel: conversationId, + text: message, + blocks: blocks, + }; + + if (messageIdToEdit) { + payload.ts = messageIdToEdit; + } + + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json; charset=utf-8", + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify(payload), + }); + + const data = (await response.json()) as SlackResponse; + + if (!data.ok) { + throw new Error(data.error); + } + + return data.ts; + } catch (e) { + console.error("Error sending slack message", e); + return undefined; + } +}; diff --git a/indexers/taskboard/utils/store.js b/indexers/taskboard/src/utils/store.ts similarity index 51% rename from indexers/taskboard/utils/store.js rename to indexers/taskboard/src/utils/store.ts index ee2b1556..19ba730e 100644 --- a/indexers/taskboard/utils/store.js +++ b/indexers/taskboard/src/utils/store.ts @@ -1,30 +1,34 @@ -const Redis = require("ioredis"); +import Redis from "ioredis"; -const connection = { - port: process.env.REDIS_PORT ?? 6379, +interface RedisConnection { + port: number; + host: string; + retryStrategy: (times: number) => number; +} + +export const connection: RedisConnection = { + port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : 6379, host: process.env.REDIS_HOST ?? "localhost", - retryStrategy: (times) => { + retryStrategy: (times: number): number => { const delay = Math.min(times * 50, 2000); console.log(`Retrying Redis connection in ${delay}ms...`); return delay; }, }; -async function checkRedisConnection() { +export const checkRedisConnection = async (): Promise => { const redis = new Redis(connection); - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { redis.on("ready", () => { console.log("Connected to Redis successfully!"); redis.disconnect(); resolve(); }); - redis.on("error", (err) => { + redis.on("error", (err: Error) => { console.error("Redis connection error:", err); redis.disconnect(); reject(err); }); }); -} - -module.exports = { connection, checkRedisConnection }; +}; diff --git a/indexers/taskboard/src/views/index.ejs b/indexers/taskboard/src/views/index.ejs new file mode 100644 index 00000000..968e2774 --- /dev/null +++ b/indexers/taskboard/src/views/index.ejs @@ -0,0 +1,45 @@ + + + + Bull Board Dashboard + + + +

Bull Board Queues

+
    + <% queues.forEach(function(queue) { %> +
  • + <%= queue %> +
  • + <% }); %> +
+ + diff --git a/indexers/taskboard/views/login.ejs b/indexers/taskboard/src/views/login.ejs similarity index 93% rename from indexers/taskboard/views/login.ejs rename to indexers/taskboard/src/views/login.ejs index e5ab5ce0..d5f61393 100644 --- a/indexers/taskboard/views/login.ejs +++ b/indexers/taskboard/src/views/login.ejs @@ -77,11 +77,10 @@