Skip to content

Commit

Permalink
Proposed consumer changes (#1912)
Browse files Browse the repository at this point in the history
  • Loading branch information
chitalian authored May 14, 2024
1 parent b877b53 commit 0ce970a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
5 changes: 5 additions & 0 deletions valhalla/jawn/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require("dotenv").config({

import express, { NextFunction } from "express";
import swaggerUi from "swagger-ui-express";
import { Worker } from "worker_threads";

import { runLoopsOnce, runMainLoops } from "./mainLoops";
import { authMiddleware } from "./middleware/auth";
Expand Down Expand Up @@ -45,6 +46,10 @@ if (KAFKA_ENABLED) {
consume();
// consumeDlq();
}
if (KAFKA_ENABLED) {
const worker = new Worker(`${__dirname}/workers/kafkaConsumer.js`);
worker.postMessage("start");
}

app.get("/healthcheck", (req, res) => {
res.json({
Expand Down
17 changes: 14 additions & 3 deletions valhalla/jawn/src/lib/clients/KafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,19 @@ if (KAFKA_ENABLED && KAFKA_BROKER && KAFKA_USERNAME && KAFKA_PASSWORD) {
}
}

const AVG_MESSAGE_SIZE = 2_000; // 2kB
const ESTIMATED_MINI_BATCH_COUNT = 3; // 3
const MESSAGES_PER_MINI_BATCH = 300;

// Average message is 1kB, so we can set minBytes to 1kB and maxBytes to 10kB
const consumer = kafka?.consumer({
groupId: "jawn-consumer",
minBytes: 100_000,
maxBytes: 10_000_000,
maxBytes:
AVG_MESSAGE_SIZE *
MESSAGES_PER_MINI_BATCH *
ESTIMATED_MINI_BATCH_COUNT *
1.1, // 10% buffer
});

export const consume = async () => {
Expand Down Expand Up @@ -90,8 +98,11 @@ export const consume = async () => {
commitOffsetsIfNecessary,
}) => {
console.log(`Received batch with ${batch.messages.length} messages.`);
const maxMessages = 150;
const miniBatches = createMiniBatches(batch.messages, maxMessages);

const miniBatches = createMiniBatches(
batch.messages,
MESSAGES_PER_MINI_BATCH
);

for (const miniBatch of miniBatches) {
const firstOffset = miniBatch[0].offset;
Expand Down
4 changes: 4 additions & 0 deletions valhalla/jawn/src/workers/kafkaConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
const path = require("path");

require("ts-node").register();
require(path.resolve(__dirname, "./kafkaConsumer.ts"));
9 changes: 9 additions & 0 deletions valhalla/jawn/src/workers/kafkaConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { parentPort } from "worker_threads";
import { consume } from "./../lib/clients/KafkaConsumer";

parentPort?.once("message", (message) => {
if (message === "start") {
console.log("Kafka consumer thread started!");
consume();
}
});

0 comments on commit 0ce970a

Please sign in to comment.