From 0ce970a07cd1291630a5234d2fabb28616a71f85 Mon Sep 17 00:00:00 2001 From: Justin Torre Date: Mon, 13 May 2024 17:54:25 -0700 Subject: [PATCH] Proposed consumer changes (#1912) --- valhalla/jawn/src/index.ts | 5 +++++ valhalla/jawn/src/lib/clients/KafkaConsumer.ts | 17 ++++++++++++++--- valhalla/jawn/src/workers/kafkaConsumer.js | 4 ++++ valhalla/jawn/src/workers/kafkaConsumer.ts | 9 +++++++++ 4 files changed, 32 insertions(+), 3 deletions(-) create mode 100644 valhalla/jawn/src/workers/kafkaConsumer.js create mode 100644 valhalla/jawn/src/workers/kafkaConsumer.ts diff --git a/valhalla/jawn/src/index.ts b/valhalla/jawn/src/index.ts index 94efd99dde..874525b85e 100644 --- a/valhalla/jawn/src/index.ts +++ b/valhalla/jawn/src/index.ts @@ -4,6 +4,7 @@ require("dotenv").config({ import express, { NextFunction } from "express"; import swaggerUi from "swagger-ui-express"; +import { Worker } from "worker_threads"; import { runLoopsOnce, runMainLoops } from "./mainLoops"; import { authMiddleware } from "./middleware/auth"; @@ -45,6 +46,10 @@ if (KAFKA_ENABLED) { consume(); // consumeDlq(); } +if (KAFKA_ENABLED) { + const worker = new Worker(`${__dirname}/workers/kafkaConsumer.js`); + worker.postMessage("start"); +} app.get("/healthcheck", (req, res) => { res.json({ diff --git a/valhalla/jawn/src/lib/clients/KafkaConsumer.ts b/valhalla/jawn/src/lib/clients/KafkaConsumer.ts index d3b3ef9777..6a76bc14f9 100644 --- a/valhalla/jawn/src/lib/clients/KafkaConsumer.ts +++ b/valhalla/jawn/src/lib/clients/KafkaConsumer.ts @@ -46,11 +46,19 @@ if (KAFKA_ENABLED && KAFKA_BROKER && KAFKA_USERNAME && KAFKA_PASSWORD) { } } +const AVG_MESSAGE_SIZE = 2_000; // 2kB +const ESTIMATED_MINI_BATCH_COUNT = 3; // 3 +const MESSAGES_PER_MINI_BATCH = 300; + // Average message is 1kB, so we can set minBytes to 1kB and maxBytes to 10kB const consumer = kafka?.consumer({ groupId: "jawn-consumer", minBytes: 100_000, - maxBytes: 10_000_000, + maxBytes: + AVG_MESSAGE_SIZE * + MESSAGES_PER_MINI_BATCH * + ESTIMATED_MINI_BATCH_COUNT * + 1.1, // 10% buffer }); export const consume = async () => { @@ -90,8 +98,11 @@ export const consume = async () => { commitOffsetsIfNecessary, }) => { console.log(`Received batch with ${batch.messages.length} messages.`); - const maxMessages = 150; - const miniBatches = createMiniBatches(batch.messages, maxMessages); + + const miniBatches = createMiniBatches( + batch.messages, + MESSAGES_PER_MINI_BATCH + ); for (const miniBatch of miniBatches) { const firstOffset = miniBatch[0].offset; diff --git a/valhalla/jawn/src/workers/kafkaConsumer.js b/valhalla/jawn/src/workers/kafkaConsumer.js new file mode 100644 index 0000000000..4095529f8f --- /dev/null +++ b/valhalla/jawn/src/workers/kafkaConsumer.js @@ -0,0 +1,4 @@ +const path = require("path"); + +require("ts-node").register(); +require(path.resolve(__dirname, "./kafkaConsumer.ts")); diff --git a/valhalla/jawn/src/workers/kafkaConsumer.ts b/valhalla/jawn/src/workers/kafkaConsumer.ts new file mode 100644 index 0000000000..ef25487803 --- /dev/null +++ b/valhalla/jawn/src/workers/kafkaConsumer.ts @@ -0,0 +1,9 @@ +import { parentPort } from "worker_threads"; +import { consume } from "./../lib/clients/KafkaConsumer"; + +parentPort?.once("message", (message) => { + if (message === "start") { + console.log("Kafka consumer thread started!"); + consume(); + } +});