From a9516cde8dfce25bba824fdc959e49ff76d560fa Mon Sep 17 00:00:00 2001 From: Christopher Aitken Date: Sat, 24 Aug 2024 21:01:18 -0500 Subject: [PATCH] Added route which runs every 15 min to process pending scans via invoke --- src/internal/processScans.ts | 51 +++++++++---------------- src/routes/addScans.ts | 23 +++++------ src/scheduled.ts | 5 ++- src/scheduled/index.ts | 3 +- src/scheduled/runEveryFifteenMinutes.ts | 24 ++++++++++++ 5 files changed, 57 insertions(+), 49 deletions(-) create mode 100644 src/scheduled/runEveryFifteenMinutes.ts diff --git a/src/internal/processScans.ts b/src/internal/processScans.ts index 7517123..ea790d3 100644 --- a/src/internal/processScans.ts +++ b/src/internal/processScans.ts @@ -1,21 +1,24 @@ -import { chunk, db, isStaging, sleep } from '#src/utils'; -import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda'; -const lambda = new LambdaClient(); +import { chunk, db, sleep } from '#src/utils'; export const processScans = async (event) => { console.log(`START PROCESS SCANS`); const startTime = new Date().getTime(); await db.connect(); - const { jobIds, userId, propertyId, lambdaRun } = event; + const { userId, propertyId } = event; + const jobIds = (await db.query({ + text: `SELECT "job_id" FROM "scans" WHERE "user_id"=$1 AND "property_id"=$2 AND "processing" = TRUE`, + values: [userId, propertyId], + })).rows.map(obj => obj.job_id); const allNodeIds = []; const pollScans = (givenJobIds) => new Promise(async (finalRes) => { - await sleep(2500); + await sleep(1000); const remainingScans = []; const batchesOfJobIds = chunk(givenJobIds, 100); for (const [index, batchOfJobIds] of batchesOfJobIds.entries()) { + console.log(`Start ${index} of ${batchesOfJobIds.length} batches`); await Promise.allSettled(batchOfJobIds.map(jobId => new Promise(async (res) => { try { - const scanResults = await fetch(`https://scan.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(500) }); + const scanResults = await fetch(`https://scan.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(10000) }); const { result, status } = await scanResults.json(); if (['delayed', 'active', 'waiting'].includes(status)) { remainingScans.push(jobId); @@ -29,48 +32,30 @@ export const processScans = async (event) => { } } catch (err) { - // console.log(err); + console.log(err); remainingScans.push(jobId); } res(1); }))); } - const stats = { userId, lambdaRun, remainingScans: remainingScans.length }; + const stats = { userId, remainingScans: remainingScans.length }; console.log(JSON.stringify(stats)); if (remainingScans.length > 0) { const currentTime = new Date().getTime(); const deltaTime = currentTime - startTime; - const twoMinutes = 2 * 60 * 1000; - if (deltaTime <= twoMinutes) { + const tenMinutes = 10 * 60 * 1000; + if (deltaTime <= tenMinutes) { await pollScans(remainingScans); } - else if (deltaTime > twoMinutes) { + else if (deltaTime > tenMinutes) { const scansExist = (await db.query({ text: `SELECT "id" FROM "scans" WHERE "job_id" = ANY($1) LIMIT 1`, values: [jobIds], })).rows?.[0]?.id; - if (lambdaRun < 750 && scansExist) { - console.log(JSON.stringify({ message: `Terminating processScans early, invoking Lambda again.`, ...stats })); - lambda.send(new InvokeCommand({ - FunctionName: `equalify-api${isStaging ? '-staging' : ''}`, - InvocationType: "Event", - Payload: Buffer.from(JSON.stringify({ - path: '/internal/processScans', - jobIds: remainingScans, - userId: userId, - propertyId: propertyId, - lambdaRun: lambdaRun + 1, - })), - })); - return; - } - else if (!scansExist) { - console.log(JSON.stringify({ message: `Scans were deleted, ending Lambda loop.`, ...stats })); - return; - } - else { - console.log(JSON.stringify({ message: `No more Lambda runs, we've maxed out at ~24 hours.`, ...stats })); - return; + if (scansExist) { + const message = `10 minutes reached, terminating processScans early`; + console.log(JSON.stringify({ message, ...stats })); + throw new Error(message); } } } diff --git a/src/routes/addScans.ts b/src/routes/addScans.ts index 675bc80..e5a698b 100644 --- a/src/routes/addScans.ts +++ b/src/routes/addScans.ts @@ -13,7 +13,6 @@ export const addScans = async ({ request, reply }) => { await db.connect(); for (const propertyId of request.body.propertyIds ?? []) { - const jobIds = []; const property = (await db.query(`SELECT "id", "discovery", "property_url" FROM "properties" WHERE "id"=$1`, [propertyId])).rows?.[0]; if (!property.discovery) { return { @@ -54,25 +53,21 @@ export const addScans = async ({ request, reply }) => { text: `INSERT INTO "scans" ("user_id", "property_id", "url_id", "job_id") VALUES ($1, $2, $3, $4) RETURNING "job_id"`, values: [jwtClaims.sub, propertyId, urlId, parseInt(jobId)] })).rows[0]; - jobIds.push(scan.job_id); } } } catch (err) { console.log(err); } - - lambda.send(new InvokeCommand({ - FunctionName: `equalify-api${isStaging ? '-staging' : ''}`, - InvocationType: "Event", - Payload: Buffer.from(JSON.stringify({ - path: '/internal/processScans', - jobIds: jobIds, - userId: jwtClaims.sub, - propertyId: propertyId, - lambdaRun: 1, - })), - })); + // lambda.send(new InvokeCommand({ + // FunctionName: `equalify-api${isStaging ? '-staging' : ''}`, + // InvocationType: "Event", + // Payload: Buffer.from(JSON.stringify({ + // path: '/internal/processScans', + // userId: jwtClaims.sub, + // propertyId: propertyId, + // })), + // })); } await db.clean(); diff --git a/src/scheduled.ts b/src/scheduled.ts index 794dcd4..accd92e 100644 --- a/src/scheduled.ts +++ b/src/scheduled.ts @@ -1,7 +1,10 @@ -import { runEveryMinute } from '#src/scheduled/index'; +import { runEveryFifteenMinutes, runEveryMinute } from '#src/scheduled/index'; export const scheduled = async (event) => { if (event.path.endsWith('/runEveryMinute')) { return runEveryMinute(); } + else if (event.path.endsWith('/runEveryFifteenMinutes')) { + return runEveryFifteenMinutes(); + } } diff --git a/src/scheduled/index.ts b/src/scheduled/index.ts index 7503871..4584174 100644 --- a/src/scheduled/index.ts +++ b/src/scheduled/index.ts @@ -1 +1,2 @@ -export * from './runEveryMinute' \ No newline at end of file +export * from './runEveryMinute' +export * from './runEveryFifteenMinutes' \ No newline at end of file diff --git a/src/scheduled/runEveryFifteenMinutes.ts b/src/scheduled/runEveryFifteenMinutes.ts new file mode 100644 index 0000000..44ba702 --- /dev/null +++ b/src/scheduled/runEveryFifteenMinutes.ts @@ -0,0 +1,24 @@ +import { db, isStaging } from '#src/utils'; +import { LambdaClient, InvokeCommand } from '@aws-sdk/client-lambda'; +const lambda = new LambdaClient(); + +export const runEveryFifteenMinutes = async () => { + await db.connect(); + const pendingScans = (await db.query({ + text: `SELECT DISTINCT "user_id", "property_id" FROM "scans" WHERE "processing" = true`, + })).rows; + await db.clean(); + console.log(JSON.stringify({ pendingScans })); + for (const { user_id, property_id } of pendingScans) { + lambda.send(new InvokeCommand({ + FunctionName: `equalify-api${isStaging ? '-staging' : ''}`, + InvocationType: "Event", + Payload: Buffer.from(JSON.stringify({ + path: '/internal/processScans', + userId: user_id, + propertyId: property_id, + })), + })); + } + return; +} \ No newline at end of file