From b815ceda25fe2f35a63fa4a00be76c3c92786ad8 Mon Sep 17 00:00:00 2001 From: Christopher Aitken Date: Sun, 9 Jun 2024 13:12:33 -0500 Subject: [PATCH] Cont. work on processScans, handling diff. statuses, matching result IDs --- src/scheduled/processScans.ts | 84 +++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 4 deletions(-) diff --git a/src/scheduled/processScans.ts b/src/scheduled/processScans.ts index 9dc1cd6..c72307e 100644 --- a/src/scheduled/processScans.ts +++ b/src/scheduled/processScans.ts @@ -1,14 +1,90 @@ +import { jwtClaims } from 'app'; import { pgClient } from 'utils'; export const processScans = async () => { + // This route is called once every minute by Amazon EventBridge Scheduler await pgClient.connect(); - const scans = (await pgClient.query(`SELECT "id", "job_id" FROM "scans" WHERE "processing"=TRUE ORDER BY "created_at" DESC`)).rows; + const scans = (await pgClient.query(`SELECT "id", "job_id", "property_id" FROM "scans" WHERE "processing"=TRUE ORDER BY "created_at" DESC`)).rows; for (const scan of scans) { - const response = await (await fetch(`https://scan.equalify.app/results/${scan.job_id}`)).json(); - if (response?.status === 'completed') { - await pgClient.query(`UPDATE "scans" SET "processing"=FALSE, "results"=$1`, [response?.result]); + try { + const { result, status } = await (await fetch(`https://scan.equalify.app/results/${scan.job_id}`)).json(); + if (['delayed', 'active', 'waiting'].includes(status)) { + console.log(`Scan ${scan.id} is ${status}- scan skipped.`); + } + else if (['failed', 'unknown'].includes(status)) { + await pgClient.query(`DELETE FROM "scans" WHERE "id"=$1`, [scan.id]); + console.log(`Scan ${scan.id} is ${status}- scan deleted.`); + } + else if (['completed'].includes(status)) { + await scanProcessor({ result, scan }); + console.log(`Scan ${scan.id} is ${status}- scan complete!`); + } + } + catch (err) { + console.log(`Scan ${scan.id} ran into an error: ${err.message}`); } } await pgClient.clean(); return; +} + +const scanProcessor = async ({ result, scan }) => { + // 1. Set processing to FALSE to prevent API from accidentally re-processing the same scan + await pgClient.query(`UPDATE "scans" SET "processing"=FALSE, "results"=$1`, [result]); + + // 2. Find existing IDs for urls, messages, tags, & nodes (or create them) + for (const row of result.urls) { + row.id = + (await pgClient.query({ + text: `SELECT "id" FROM "urls" WHERE "user_id"=$1 AND "url"=$2 AND "property_id"=$3`, + values: [jwtClaims.sub, row.url, scan.property_id], + })).rows?.[0]?.id + ?? + (await pgClient.query({ + text: `INSERT INTO "urls" ("user_id", "url", "property_id") VALUES ($1, $2, $3) RETURNING "id"`, + values: [jwtClaims.sub, row.url, scan.property_id] + })).rows?.[0]?.id; + } + for (const row of result.messages) { + row.id = + (await pgClient.query({ + text: `SELECT "id" FROM "messages" WHERE "user_id"=$1 AND "message"=$2 AND "type"=$3`, + values: [jwtClaims.sub, row.message, row.type], + })).rows?.[0]?.id + ?? + (await pgClient.query({ + text: `INSERT INTO "messages" ("user_id", "message", "type") VALUES ($1, $2, $3) RETURNING "id"`, + values: [jwtClaims.sub, row.message, row.type], + })).rows?.[0]?.id; + } + for (const row of result.tags) { + row.id = + (await pgClient.query({ + text: `SELECT "id" FROM "tags" WHERE "user_id"=$1 AND "tag"=$2`, + values: [jwtClaims.sub, row.tag], + })).rows?.[0]?.id + ?? + (await pgClient.query({ + text: `INSERT INTO "tags" ("user_id", "tag") VALUES ($1, $2) RETURNING "id"`, + values: [jwtClaims.sub, row.tag], + })).rows?.[0]?.id; + } + for (const row of result.nodes) { + row.id = + (await pgClient.query({ + text: `SELECT "id" FROM "nodes" WHERE "user_id"=$1 AND "html"=$2 AND "targets"=$3 AND "url_id"=$4`, + values: [jwtClaims.sub, row.html, row.targets, row.url_id], + })).rows?.[0]?.id + ?? + (await pgClient.query({ + text: `INSERT INTO "nodes" ("user_id", "html", "targets", "url_id") VALUES ($1, $2, $3, $4) RETURNING "id"`, + values: [jwtClaims.sub, row.html, row.targets, row.url_id], + })).rows?.[0]?.id; + } + + // 3. Compare & update nodes + + // 4. Delete the scan and move on! + await pgClient.query(`DELETE FROM "scans" WHERE "id"=$1`, [scan.id]); + return; } \ No newline at end of file