Skip to content

Commit

Permalink
refactor(webserver): Use rate limiters instead of semaphore in Obstac…
Browse files Browse the repository at this point in the history
…leImagesCapabilityRouter
  • Loading branch information
Hypfer committed Sep 14, 2024
1 parent 7a6eb1e commit 90e5ab9
Showing 1 changed file with 35 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
const CapabilityRouter = require("./CapabilityRouter");
const RateLimit = require("express-rate-limit");
const Semaphore = require("semaphore");
const {IMAGE_FILE_FORMAT} = require("../../utils/const");

class ObstacleImagesCapabilityRouter extends CapabilityRouter {
preInit() {
// Max two simultaneous image transmissions to ensure a small resource footprint
this.semaphore = Semaphore(2);
this.primaryLimiter = RateLimit.rateLimit({
windowMs: 1000,
max: 3,
keyGenerator: () => "global",
});

this.secondaryLimiter = RateLimit.rateLimit({
windowMs: 5 * 1000,
max: 10,
keyGenerator: () => "global",
});

this.limiter = RateLimit.rateLimit({
windowMs: 30*1000,
max: 30
this.tertiaryLimiter = RateLimit.rateLimit({
windowMs: 20*1000,
max: 30,
keyGenerator: () => "global",
});
}

Expand Down Expand Up @@ -45,68 +54,33 @@ class ObstacleImagesCapabilityRouter extends CapabilityRouter {
}
});

this.router.get("/img/:id", this.limiter, async (req, res) => {
let imageStream;
let requestIsClosed = false;
let hasExitedSemaphore = false;

req.socket.on("close", (asdf) => {
requestIsClosed = true;
});

await new Promise((resolve) => {
this.semaphore.take(() => {
resolve();
});
});

try {
imageStream = await this.capability.getStreamForId(req.params.id);
} catch (e) {
this.semaphore.leave();
return this.sendErrorResponse(req, res, e);
}

if (imageStream === null) {
this.semaphore.leave();
return res.sendStatus(404);
}

res.setHeader("Content-Type", CONTENT_HEADER_MAPPING[this.capability.getProperties().fileFormat]);
res.setHeader("Content-Disposition", "inline");

imageStream.pipe(res);

imageStream.on("error", (error) => {
if (!hasExitedSemaphore) {
hasExitedSemaphore = true;

this.semaphore.leave();
this.router.get(
"/img/:id",
this.primaryLimiter,
this.secondaryLimiter,
this.tertiaryLimiter,
async (req, res) => {
let imageStream;
try {
imageStream = await this.capability.getStreamForId(req.params.id);
} catch (e) {
return this.sendErrorResponse(req, res, e);
}

res.sendStatus(500);
});

imageStream.on("close", () => {
if (!hasExitedSemaphore) {
hasExitedSemaphore = true;

this.semaphore.leave();
if (imageStream === null) {
return res.sendStatus(404);
}
});

// Without this, aborted requests never properly clean up the imageStream nor do they leave the semaphore
if (requestIsClosed) {
imageStream.destroy();
res.end();
res.setHeader("Content-Type", CONTENT_HEADER_MAPPING[this.capability.getProperties().fileFormat]);
res.setHeader("Content-Disposition", "inline");

if (!hasExitedSemaphore) {
hasExitedSemaphore = true;
imageStream.pipe(res);

this.semaphore.leave();
}
imageStream.on("error", (error) => {
res.sendStatus(500);
});
}
});
);
}
}

Expand Down

0 comments on commit 90e5ab9

Please sign in to comment.