Skip to content

Commit

Permalink
Added route which runs every 15 min to process pending scans via invoke
Browse files Browse the repository at this point in the history
  • Loading branch information
heythisischris committed Aug 25, 2024
1 parent c30d332 commit a9516cd
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 49 deletions.
51 changes: 18 additions & 33 deletions src/internal/processScans.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
import { chunk, db, isStaging, sleep } from '#src/utils';
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda';
const lambda = new LambdaClient();
import { chunk, db, sleep } from '#src/utils';

export const processScans = async (event) => {
console.log(`START PROCESS SCANS`);
const startTime = new Date().getTime();
await db.connect();
const { jobIds, userId, propertyId, lambdaRun } = event;
const { userId, propertyId } = event;
const jobIds = (await db.query({
text: `SELECT "job_id" FROM "scans" WHERE "user_id"=$1 AND "property_id"=$2 AND "processing" = TRUE`,
values: [userId, propertyId],
})).rows.map(obj => obj.job_id);
const allNodeIds = [];
const pollScans = (givenJobIds) => new Promise(async (finalRes) => {
await sleep(2500);
await sleep(1000);
const remainingScans = [];
const batchesOfJobIds = chunk(givenJobIds, 100);
for (const [index, batchOfJobIds] of batchesOfJobIds.entries()) {
console.log(`Start ${index} of ${batchesOfJobIds.length} batches`);
await Promise.allSettled(batchOfJobIds.map(jobId => new Promise(async (res) => {
try {
const scanResults = await fetch(`https://scan.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(500) });
const scanResults = await fetch(`https://scan.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(10000) });
const { result, status } = await scanResults.json();
if (['delayed', 'active', 'waiting'].includes(status)) {
remainingScans.push(jobId);
Expand All @@ -29,48 +32,30 @@ export const processScans = async (event) => {
}
}
catch (err) {
// console.log(err);
console.log(err);
remainingScans.push(jobId);
}
res(1);
})));
}
const stats = { userId, lambdaRun, remainingScans: remainingScans.length };
const stats = { userId, remainingScans: remainingScans.length };
console.log(JSON.stringify(stats));
if (remainingScans.length > 0) {
const currentTime = new Date().getTime();
const deltaTime = currentTime - startTime;
const twoMinutes = 2 * 60 * 1000;
if (deltaTime <= twoMinutes) {
const tenMinutes = 10 * 60 * 1000;
if (deltaTime <= tenMinutes) {
await pollScans(remainingScans);
}
else if (deltaTime > twoMinutes) {
else if (deltaTime > tenMinutes) {
const scansExist = (await db.query({
text: `SELECT "id" FROM "scans" WHERE "job_id" = ANY($1) LIMIT 1`,
values: [jobIds],
})).rows?.[0]?.id;
if (lambdaRun < 750 && scansExist) {
console.log(JSON.stringify({ message: `Terminating processScans early, invoking Lambda again.`, ...stats }));
lambda.send(new InvokeCommand({
FunctionName: `equalify-api${isStaging ? '-staging' : ''}`,
InvocationType: "Event",
Payload: Buffer.from(JSON.stringify({
path: '/internal/processScans',
jobIds: remainingScans,
userId: userId,
propertyId: propertyId,
lambdaRun: lambdaRun + 1,
})),
}));
return;
}
else if (!scansExist) {
console.log(JSON.stringify({ message: `Scans were deleted, ending Lambda loop.`, ...stats }));
return;
}
else {
console.log(JSON.stringify({ message: `No more Lambda runs, we've maxed out at ~24 hours.`, ...stats }));
return;
if (scansExist) {
const message = `10 minutes reached, terminating processScans early`;
console.log(JSON.stringify({ message, ...stats }));
throw new Error(message);
}
}
}
Expand Down
23 changes: 9 additions & 14 deletions src/routes/addScans.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export const addScans = async ({ request, reply }) => {

await db.connect();
for (const propertyId of request.body.propertyIds ?? []) {
const jobIds = [];
const property = (await db.query(`SELECT "id", "discovery", "property_url" FROM "properties" WHERE "id"=$1`, [propertyId])).rows?.[0];
if (!property.discovery) {
return {
Expand Down Expand Up @@ -54,25 +53,21 @@ export const addScans = async ({ request, reply }) => {
text: `INSERT INTO "scans" ("user_id", "property_id", "url_id", "job_id") VALUES ($1, $2, $3, $4) RETURNING "job_id"`,
values: [jwtClaims.sub, propertyId, urlId, parseInt(jobId)]
})).rows[0];
jobIds.push(scan.job_id);
}
}
}
catch (err) {
console.log(err);
}

lambda.send(new InvokeCommand({
FunctionName: `equalify-api${isStaging ? '-staging' : ''}`,
InvocationType: "Event",
Payload: Buffer.from(JSON.stringify({
path: '/internal/processScans',
jobIds: jobIds,
userId: jwtClaims.sub,
propertyId: propertyId,
lambdaRun: 1,
})),
}));
// lambda.send(new InvokeCommand({
// FunctionName: `equalify-api${isStaging ? '-staging' : ''}`,
// InvocationType: "Event",
// Payload: Buffer.from(JSON.stringify({
// path: '/internal/processScans',
// userId: jwtClaims.sub,
// propertyId: propertyId,
// })),
// }));
}
await db.clean();

Expand Down
5 changes: 4 additions & 1 deletion src/scheduled.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { runEveryMinute } from '#src/scheduled/index';
import { runEveryFifteenMinutes, runEveryMinute } from '#src/scheduled/index';

export const scheduled = async (event) => {
if (event.path.endsWith('/runEveryMinute')) {
return runEveryMinute();
}
else if (event.path.endsWith('/runEveryFifteenMinutes')) {
return runEveryFifteenMinutes();
}
}
3 changes: 2 additions & 1 deletion src/scheduled/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './runEveryMinute'
export * from './runEveryMinute'
export * from './runEveryFifteenMinutes'
24 changes: 24 additions & 0 deletions src/scheduled/runEveryFifteenMinutes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { db, isStaging } from '#src/utils';
import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda';
const lambda = new LambdaClient();

export const runEveryFifteenMinutes = async () => {
await db.connect();
const pendingScans = (await db.query({
text: `SELECT DISTINCT "user_id", "property_id" FROM "scans" WHERE "processing" = true`,
})).rows;
await db.clean();
console.log(JSON.stringify({ pendingScans }));
for (const { user_id, property_id } of pendingScans) {
lambda.send(new InvokeCommand({
FunctionName: `equalify-api${isStaging ? '-staging' : ''}`,
InvocationType: "Event",
Payload: Buffer.from(JSON.stringify({
path: '/internal/processScans',
userId: user_id,
propertyId: property_id,
})),
}));
}
return;
}

0 comments on commit a9516cd

Please sign in to comment.