From 940383dfb4fff4e47e772a461956d5b0ad7678c7 Mon Sep 17 00:00:00 2001 From: Christopher Aitken Date: Thu, 22 Aug 2024 20:22:01 -0500 Subject: [PATCH] Compressed processScans payload, batching scan requests in chunks of 100 --- src/internal/processScans.ts | 85 +++++++++++++++++++----------------- src/routes/addScans.ts | 12 ++--- src/routes/getScans.ts | 2 +- src/utils/chunk.ts | 7 +++ src/utils/index.ts | 3 +- 5 files changed, 61 insertions(+), 48 deletions(-) create mode 100644 src/utils/chunk.ts diff --git a/src/internal/processScans.ts b/src/internal/processScans.ts index f2cdc8e..0b8b70b 100644 --- a/src/internal/processScans.ts +++ b/src/internal/processScans.ts @@ -1,33 +1,38 @@ -import { db, sleep } from '#src/utils'; +import { chunk, db, sleep } from '#src/utils'; export const processScans = async (event) => { console.log(`START PROCESS SCANS`); await db.connect(); - const scans = event.scans; + const { jobIds, userId, propertyId } = event; const allNodeIds = []; - const pollScans = (givenScans) => new Promise(async (finalRes) => { + const pollScans = (givenJobIds) => new Promise(async (finalRes) => { await sleep(2500); const remainingScans = []; - await Promise.allSettled(givenScans.map(scan => new Promise(async (res) => { - try { - const scanResults = await fetch(`https://scan.equalify.app/results/${scan.job_id}`, { signal: AbortSignal.timeout(250) }); - const { result, status } = await scanResults.json(); - if (['delayed', 'active', 'waiting'].includes(status)) { - remainingScans.push(scan); - } - else if (['failed', 'unknown'].includes(status)) { - await db.query(`DELETE FROM "scans" WHERE "id"=$1`, [scan.id]); + const batchesOfJobIds = chunk(givenJobIds, 100); + for (const [index, batchOfJobIds] of batchesOfJobIds.entries()) { + console.log(`Start batch ${index + 1} of ${batchesOfJobIds.length}`); + await Promise.allSettled(batchOfJobIds.map(jobId => new Promise(async (res) => { + try { + const scanResults = await fetch(`https://scan.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(250) }); + const { result, status } = await scanResults.json(); + if (['delayed', 'active', 'waiting'].includes(status)) { + remainingScans.push(jobId); + } + else if (['failed', 'unknown'].includes(status)) { + await db.query(`DELETE FROM "scans" WHERE "job_id"=$1`, [jobId]); + } + else if (['completed'].includes(status)) { + const nodeIds = await scanProcessor({ result, jobId, userId, propertyId }); + allNodeIds.push(...nodeIds); + } } - else if (['completed'].includes(status)) { - const nodeIds = await scanProcessor({ result, scan }); - allNodeIds.push(...nodeIds); + catch (err) { + remainingScans.push(jobId); } - } - catch (err) { - remainingScans.push(scan); - } - res(1); - }))); + res(1); + }))); + console.log(`End batch ${index + 1} of ${batchesOfJobIds.length}`); + } console.log(JSON.stringify({ remainingScans: remainingScans.length })); if (remainingScans.length > 0) { await pollScans(remainingScans); @@ -37,12 +42,10 @@ export const processScans = async (event) => { } console.log(`End Reached`); }); - await pollScans(scans); + await pollScans(jobIds); // At the end of all scans, reconcile equalified nodes // Set node equalified to true for previous nodes associated w/ this scan! - const userId = scans?.[0]?.user_id; - const propertyId = scans?.[0]?.property_id; const allPropertyUrls = (await db.query({ text: `SELECT "id" FROM "urls" WHERE "user_id"=$1 AND "property_id"=$2`, values: [userId, propertyId], @@ -80,36 +83,36 @@ export const processScans = async (event) => { return; } -const scanProcessor = async ({ result, scan }) => { +const scanProcessor = async ({ result, jobId, userId, propertyId }) => { // Find existing IDs for urls, messages, tags, & nodes (or create them) if (result.nodes.length > 0) { for (const row of result.urls) { row.id = (await db.query({ text: `SELECT "id" FROM "urls" WHERE "user_id"=$1 AND "url"=$2 AND "property_id"=$3`, - values: [scan.user_id, row.url, scan.property_id], + values: [userId, row.url, propertyId], })).rows?.[0]?.id ?? (await db.query({ text: `INSERT INTO "urls" ("user_id", "url", "property_id") VALUES ($1, $2, $3) RETURNING "id"`, - values: [scan.user_id, row.url, scan.property_id] + values: [userId, row.url, propertyId] })).rows?.[0]?.id; } for (const row of result.nodes) { const existingId = (await db.query({ text: `SELECT "id" FROM "enodes" WHERE "user_id"=$1 AND "html"=$2 AND "targets"=$3 AND "url_id"=$4`, - values: [scan.user_id, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id], + values: [userId, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id], })).rows?.[0]?.id; row.id = existingId ?? (await db.query({ text: `INSERT INTO "enodes" ("user_id", "html", "targets", "url_id", "equalified") VALUES ($1, $2, $3, $4, $5) RETURNING "id"`, - values: [scan.user_id, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id, false], + values: [userId, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id, false], })).rows?.[0]?.id; const existingNodeUpdateId = (await db.query({ text: `SELECT "id" FROM "enode_updates" WHERE "user_id"=$1 AND "enode_id"=$2 AND "created_at"::text LIKE $3`, - values: [scan.user_id, row.id, `${new Date().toISOString().split('T')[0]}%`], + values: [userId, row.id, `${new Date().toISOString().split('T')[0]}%`], })).rows[0]?.id; if (existingNodeUpdateId) { await db.query({ @@ -120,7 +123,7 @@ const scanProcessor = async ({ result, scan }) => { else { await db.query({ text: `INSERT INTO "enode_updates" ("user_id", "enode_id", "equalified") VALUES ($1, $2, $3)`, - values: [scan.user_id, row.id, false], + values: [userId, row.id, false], }); } @@ -135,34 +138,34 @@ const scanProcessor = async ({ result, scan }) => { row.id = (await db.query({ text: `SELECT "id" FROM "tags" WHERE "user_id"=$1 AND "tag"=$2`, - values: [scan.user_id, row.tag], + values: [userId, row.tag], })).rows?.[0]?.id ?? (await db.query({ text: `INSERT INTO "tags" ("user_id", "tag") VALUES ($1, $2) RETURNING "id"`, - values: [scan.user_id, row.tag], + values: [userId, row.tag], })).rows?.[0]?.id; } for (const row of result.messages) { row.id = (await db.query({ text: `SELECT "id" FROM "messages" WHERE "user_id"=$1 AND "message"=$2 AND "type"=$3`, - values: [scan.user_id, row.message, row.type], + values: [userId, row.message, row.type], })).rows?.[0]?.id ?? (await db.query({ text: `INSERT INTO "messages" ("user_id", "message", "type") VALUES ($1, $2, $3) RETURNING "id"`, - values: [scan.user_id, row.message, row.type], + values: [userId, row.message, row.type], })).rows?.[0]?.id; for (const relatedNodeId of row.relatedNodeIds) { try { const messsageNodeExists = (await db.query({ text: `SELECT "id" FROM "message_nodes" WHERE "user_id"=$1 AND "message_id"=$2 AND "enode_id"=$3`, - values: [scan.user_id, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id], + values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id], })).rows?.[0]?.id; if (!messsageNodeExists) { await db.query({ text: `INSERT INTO "message_nodes" ("user_id", "message_id", "enode_id") VALUES ($1, $2, $3)`, - values: [scan.user_id, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id] + values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id] }) } } @@ -175,12 +178,12 @@ const scanProcessor = async ({ result, scan }) => { try { const messageTagExists = (await db.query({ text: `SELECT "id" FROM "message_tags" WHERE "user_id"=$1 AND "message_id"=$2 AND "tag_id"=$3`, - values: [scan.user_id, row.id, result.nodes.find(obj => obj.nodeId === relatedTagId)?.id], + values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedTagId)?.id], })).rows?.[0]?.id; if (!messageTagExists) { await db.query({ text: `INSERT INTO "message_tags" ("user_id", "message_id", "tag_id") VALUES ($1, $2, $3)`, - values: [scan.user_id, row.id, result.tags.find(obj => obj.tagId === relatedTagId)?.id] + values: [userId, row.id, result.tags.find(obj => obj.tagId === relatedTagId)?.id] }) } } @@ -191,8 +194,8 @@ const scanProcessor = async ({ result, scan }) => { } } await db.query({ - text: `UPDATE "scans" SET "processing"=FALSE, "results"=$1 WHERE "id"=$2`, - values: [result, scan.id], + text: `UPDATE "scans" SET "processing"=FALSE, "results"=$1 WHERE "job_id"=$2`, + values: [result, jobId], }); return result.nodes; diff --git a/src/routes/addScans.ts b/src/routes/addScans.ts index 3ca3b62..e406d71 100644 --- a/src/routes/addScans.ts +++ b/src/routes/addScans.ts @@ -13,7 +13,7 @@ export const addScans = async ({ request, reply }) => { await db.connect(); for (const propertyId of request.body.propertyIds ?? []) { - const scans = []; + const jobIds = []; const property = (await db.query(`SELECT "id", "discovery", "property_url" FROM "properties" WHERE "id"=$1`, [propertyId])).rows?.[0]; if (!property.discovery) { return { @@ -39,7 +39,7 @@ export const addScans = async ({ request, reply }) => { headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ url: property.property_url }) })).json(); - console.log(JSON.stringify({ scanResponse })); + // console.log(JSON.stringify({ scanResponse })); for (const { jobId, url } of scanResponse?.jobs ?? []) { const urlId = (await db.query({ @@ -51,10 +51,10 @@ export const addScans = async ({ request, reply }) => { })).rows?.[0]?.id; const scan = (await db.query({ - text: `INSERT INTO "scans" ("user_id", "property_id", "url_id", "job_id") VALUES ($1, $2, $3, $4) RETURNING "id", "job_id", "property_id", "user_id"`, + text: `INSERT INTO "scans" ("user_id", "property_id", "url_id", "job_id") VALUES ($1, $2, $3, $4) RETURNING "job_id"`, values: [jwtClaims.sub, propertyId, urlId, parseInt(jobId)] })).rows[0]; - scans.push(scan); + jobIds.push(scan.job_id); } } } @@ -67,7 +67,9 @@ export const addScans = async ({ request, reply }) => { InvocationType: "Event", Payload: Buffer.from(JSON.stringify({ path: '/internal/processScans', - scans: scans + jobIds: jobIds, + userId: jwtClaims.sub, + propertyId: propertyId, })), })); } diff --git a/src/routes/getScans.ts b/src/routes/getScans.ts index cf2ac1b..cab9275 100644 --- a/src/routes/getScans.ts +++ b/src/routes/getScans.ts @@ -27,7 +27,7 @@ export const getScans = async ({ request, reply }) => { } }`, variables: { - limit: parseInt(request.query.limit ?? 100), + limit: parseInt(request.query.limit ?? 5000), offset: parseInt(request.query.offset ?? 0), }, }); diff --git a/src/utils/chunk.ts b/src/utils/chunk.ts new file mode 100644 index 0000000..55c448e --- /dev/null +++ b/src/utils/chunk.ts @@ -0,0 +1,7 @@ +export const chunk = (array, size) => { + const result = []; + for (let i = 0; i < array.length; i += size) { + result.push(array.slice(i, i + size)) + } + return result; +} \ No newline at end of file diff --git a/src/utils/index.ts b/src/utils/index.ts index 53c38fe..d156779 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -8,4 +8,5 @@ export * from './formatEmail' export * from './getMode' export * from './isStaging' export * from './graphql' -export * from './sleep' \ No newline at end of file +export * from './sleep' +export * from './chunk' \ No newline at end of file