Skip to content

Commit

Permalink
moving kafka to worker threads (#1963)
Browse files Browse the repository at this point in the history
  • Loading branch information
chitalian authored May 20, 2024
1 parent 5c4ee59 commit e92e2e8
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
18 changes: 8 additions & 10 deletions valhalla/jawn/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ require("dotenv").config({

import express, { NextFunction } from "express";
import swaggerUi from "swagger-ui-express";
import { Worker } from "worker_threads";
import { tokenRouter } from "./lib/routers/tokenRouter";
import { runLoopsOnce, runMainLoops } from "./mainLoops";
import { authMiddleware } from "./middleware/auth";
import { IS_RATE_LIMIT_ENABLED, limiter } from "./middleware/ratelimitter";
import { RegisterRoutes as registerPrivateTSOARoutes } from "./tsoa-build/private/routes";
import { RegisterRoutes as registerPublicTSOARoutes } from "./tsoa-build/public/routes";
import * as publicSwaggerDoc from "./tsoa-build/public/swagger.json";
import { initLogs } from "./utils/injectLogs";
import { initSentry } from "./utils/injectSentry";
import { IS_RATE_LIMIT_ENABLED, limiter } from "./middleware/ratelimitter";
import { tokenRouter } from "./lib/routers/tokenRouter";
import { consume, consumeDlq } from "./lib/clients/KafkaConsumer";
import { startConsumers } from "./workers/consumerInterface";

export const ENVIRONMENT: "production" | "development" = (process.env
.VERCEL_ENV ?? "development") as any;
Expand All @@ -41,13 +40,12 @@ const app = express();

const KAFKA_CREDS = JSON.parse(process.env.KAFKA_CREDS ?? "{}");
const KAFKA_ENABLED = (KAFKA_CREDS?.KAFKA_ENABLED ?? "false") === "true";

if (KAFKA_ENABLED) {
consume();
consumeDlq();
}
if (KAFKA_ENABLED) {
const worker = new Worker(`${__dirname}/workers/kafkaConsumer.js`);
worker.postMessage("start");
startConsumers({
dlqCount: 1,
normalCount: 1,
});
}

app.get("/healthcheck", (req, res) => {
Expand Down
8 changes: 5 additions & 3 deletions valhalla/jawn/src/lib/clients/KafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ export const consumeDlq = async () => {
heartbeat,
commitOffsetsIfNecessary,
}) => {
console.log(`Received batch with ${batch.messages.length} messages.`);
console.log(
`DLQ: Received batch with ${batch.messages.length} messages.`
);
const maxMessages = 300;
const miniBatches = createMiniBatches(batch.messages, maxMessages);

Expand All @@ -292,7 +294,7 @@ export const consumeDlq = async () => {

const mappedMessages = mapDlqKafkaMessageToMessage(miniBatch);
if (mappedMessages.error || !mappedMessages.data) {
console.error("Failed to map messages", mappedMessages.error);
console.error("DLQ: Failed to map messages", mappedMessages.error);
return;
}

Expand All @@ -305,7 +307,7 @@ export const consumeDlq = async () => {
"request-response-logs-prod-dlq"
);
if (consumeResult.error) {
console.error("Failed to consume batch", consumeResult.error);
console.error("DLQ: Failed to consume batch", consumeResult.error);
// TODO: Best way to handle this?
}

Expand Down
19 changes: 19 additions & 0 deletions valhalla/jawn/src/workers/consumerInterface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Worker } from "worker_threads";

export function startConsumers({
normalCount,
dlqCount,
}: {
normalCount: number;
dlqCount: number;
}) {
for (let i = 0; i < normalCount; i++) {
const worker = new Worker(`${__dirname}/kafkaConsumer.js`);
worker.postMessage("start");
}

for (let i = 0; i < dlqCount; i++) {
const workerDlq = new Worker(`${__dirname}/kafkaConsumer.js`);
workerDlq.postMessage("start-dlq");
}
}
5 changes: 4 additions & 1 deletion valhalla/jawn/src/workers/kafkaConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { parentPort } from "worker_threads";
import { consume } from "./../lib/clients/KafkaConsumer";
import { consume, consumeDlq } from "./../lib/clients/KafkaConsumer";

parentPort?.once("message", (message) => {
if (message === "start") {
console.log("Kafka consumer thread started!");
consume();
} else if (message === "start-dlq") {
console.log("Kafka DLQ consumer thread started!");
consumeDlq();
}
});

0 comments on commit e92e2e8

Please sign in to comment.