Skip to content

Commit

Permalink
Compressed processScans payload, batching scan requests in chunks of 100
Browse files Browse the repository at this point in the history
  • Loading branch information
heythisischris committed Aug 23, 2024
1 parent 3a39185 commit 940383d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 48 deletions.
85 changes: 44 additions & 41 deletions src/internal/processScans.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
import { db, sleep } from '#src/utils';
import { chunk, db, sleep } from '#src/utils';

export const processScans = async (event) => {
console.log(`START PROCESS SCANS`);
await db.connect();
const scans = event.scans;
const { jobIds, userId, propertyId } = event;
const allNodeIds = [];
const pollScans = (givenScans) => new Promise(async (finalRes) => {
const pollScans = (givenJobIds) => new Promise(async (finalRes) => {
await sleep(2500);
const remainingScans = [];
await Promise.allSettled(givenScans.map(scan => new Promise(async (res) => {
try {
const scanResults = await fetch(`https://scan.equalify.app/results/${scan.job_id}`, { signal: AbortSignal.timeout(250) });
const { result, status } = await scanResults.json();
if (['delayed', 'active', 'waiting'].includes(status)) {
remainingScans.push(scan);
}
else if (['failed', 'unknown'].includes(status)) {
await db.query(`DELETE FROM "scans" WHERE "id"=$1`, [scan.id]);
const batchesOfJobIds = chunk(givenJobIds, 100);
for (const [index, batchOfJobIds] of batchesOfJobIds.entries()) {
console.log(`Start batch ${index + 1} of ${batchesOfJobIds.length}`);
await Promise.allSettled(batchOfJobIds.map(jobId => new Promise(async (res) => {
try {
const scanResults = await fetch(`https://scan.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(250) });
const { result, status } = await scanResults.json();
if (['delayed', 'active', 'waiting'].includes(status)) {
remainingScans.push(jobId);
}
else if (['failed', 'unknown'].includes(status)) {
await db.query(`DELETE FROM "scans" WHERE "job_id"=$1`, [jobId]);
}
else if (['completed'].includes(status)) {
const nodeIds = await scanProcessor({ result, jobId, userId, propertyId });
allNodeIds.push(...nodeIds);
}
}
else if (['completed'].includes(status)) {
const nodeIds = await scanProcessor({ result, scan });
allNodeIds.push(...nodeIds);
catch (err) {
remainingScans.push(jobId);
}
}
catch (err) {
remainingScans.push(scan);
}
res(1);
})));
res(1);
})));
console.log(`End batch ${index + 1} of ${batchesOfJobIds.length}`);
}
console.log(JSON.stringify({ remainingScans: remainingScans.length }));
if (remainingScans.length > 0) {
await pollScans(remainingScans);
Expand All @@ -37,12 +42,10 @@ export const processScans = async (event) => {
}
console.log(`End Reached`);
});
await pollScans(scans);
await pollScans(jobIds);

// At the end of all scans, reconcile equalified nodes
// Set node equalified to true for previous nodes associated w/ this scan!
const userId = scans?.[0]?.user_id;
const propertyId = scans?.[0]?.property_id;
const allPropertyUrls = (await db.query({
text: `SELECT "id" FROM "urls" WHERE "user_id"=$1 AND "property_id"=$2`,
values: [userId, propertyId],
Expand Down Expand Up @@ -80,36 +83,36 @@ export const processScans = async (event) => {
return;
}

const scanProcessor = async ({ result, scan }) => {
const scanProcessor = async ({ result, jobId, userId, propertyId }) => {
// Find existing IDs for urls, messages, tags, & nodes (or create them)
if (result.nodes.length > 0) {
for (const row of result.urls) {
row.id =
(await db.query({
text: `SELECT "id" FROM "urls" WHERE "user_id"=$1 AND "url"=$2 AND "property_id"=$3`,
values: [scan.user_id, row.url, scan.property_id],
values: [userId, row.url, propertyId],
})).rows?.[0]?.id
??
(await db.query({
text: `INSERT INTO "urls" ("user_id", "url", "property_id") VALUES ($1, $2, $3) RETURNING "id"`,
values: [scan.user_id, row.url, scan.property_id]
values: [userId, row.url, propertyId]
})).rows?.[0]?.id;
}
for (const row of result.nodes) {
const existingId = (await db.query({
text: `SELECT "id" FROM "enodes" WHERE "user_id"=$1 AND "html"=$2 AND "targets"=$3 AND "url_id"=$4`,
values: [scan.user_id, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id],
values: [userId, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id],
})).rows?.[0]?.id;

row.id = existingId ??
(await db.query({
text: `INSERT INTO "enodes" ("user_id", "html", "targets", "url_id", "equalified") VALUES ($1, $2, $3, $4, $5) RETURNING "id"`,
values: [scan.user_id, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id, false],
values: [userId, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id, false],
})).rows?.[0]?.id;

const existingNodeUpdateId = (await db.query({
text: `SELECT "id" FROM "enode_updates" WHERE "user_id"=$1 AND "enode_id"=$2 AND "created_at"::text LIKE $3`,
values: [scan.user_id, row.id, `${new Date().toISOString().split('T')[0]}%`],
values: [userId, row.id, `${new Date().toISOString().split('T')[0]}%`],
})).rows[0]?.id;
if (existingNodeUpdateId) {
await db.query({
Expand All @@ -120,7 +123,7 @@ const scanProcessor = async ({ result, scan }) => {
else {
await db.query({
text: `INSERT INTO "enode_updates" ("user_id", "enode_id", "equalified") VALUES ($1, $2, $3)`,
values: [scan.user_id, row.id, false],
values: [userId, row.id, false],
});
}

Expand All @@ -135,34 +138,34 @@ const scanProcessor = async ({ result, scan }) => {
row.id =
(await db.query({
text: `SELECT "id" FROM "tags" WHERE "user_id"=$1 AND "tag"=$2`,
values: [scan.user_id, row.tag],
values: [userId, row.tag],
})).rows?.[0]?.id
??
(await db.query({
text: `INSERT INTO "tags" ("user_id", "tag") VALUES ($1, $2) RETURNING "id"`,
values: [scan.user_id, row.tag],
values: [userId, row.tag],
})).rows?.[0]?.id;
}
for (const row of result.messages) {
row.id = (await db.query({
text: `SELECT "id" FROM "messages" WHERE "user_id"=$1 AND "message"=$2 AND "type"=$3`,
values: [scan.user_id, row.message, row.type],
values: [userId, row.message, row.type],
})).rows?.[0]?.id ??
(await db.query({
text: `INSERT INTO "messages" ("user_id", "message", "type") VALUES ($1, $2, $3) RETURNING "id"`,
values: [scan.user_id, row.message, row.type],
values: [userId, row.message, row.type],
})).rows?.[0]?.id;

for (const relatedNodeId of row.relatedNodeIds) {
try {
const messsageNodeExists = (await db.query({
text: `SELECT "id" FROM "message_nodes" WHERE "user_id"=$1 AND "message_id"=$2 AND "enode_id"=$3`,
values: [scan.user_id, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id],
values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id],
})).rows?.[0]?.id;
if (!messsageNodeExists) {
await db.query({
text: `INSERT INTO "message_nodes" ("user_id", "message_id", "enode_id") VALUES ($1, $2, $3)`,
values: [scan.user_id, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id]
values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id]
})
}
}
Expand All @@ -175,12 +178,12 @@ const scanProcessor = async ({ result, scan }) => {
try {
const messageTagExists = (await db.query({
text: `SELECT "id" FROM "message_tags" WHERE "user_id"=$1 AND "message_id"=$2 AND "tag_id"=$3`,
values: [scan.user_id, row.id, result.nodes.find(obj => obj.nodeId === relatedTagId)?.id],
values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedTagId)?.id],
})).rows?.[0]?.id;
if (!messageTagExists) {
await db.query({
text: `INSERT INTO "message_tags" ("user_id", "message_id", "tag_id") VALUES ($1, $2, $3)`,
values: [scan.user_id, row.id, result.tags.find(obj => obj.tagId === relatedTagId)?.id]
values: [userId, row.id, result.tags.find(obj => obj.tagId === relatedTagId)?.id]
})
}
}
Expand All @@ -191,8 +194,8 @@ const scanProcessor = async ({ result, scan }) => {
}
}
await db.query({
text: `UPDATE "scans" SET "processing"=FALSE, "results"=$1 WHERE "id"=$2`,
values: [result, scan.id],
text: `UPDATE "scans" SET "processing"=FALSE, "results"=$1 WHERE "job_id"=$2`,
values: [result, jobId],
});

return result.nodes;
Expand Down
12 changes: 7 additions & 5 deletions src/routes/addScans.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const addScans = async ({ request, reply }) => {

await db.connect();
for (const propertyId of request.body.propertyIds ?? []) {
const scans = [];
const jobIds = [];
const property = (await db.query(`SELECT "id", "discovery", "property_url" FROM "properties" WHERE "id"=$1`, [propertyId])).rows?.[0];
if (!property.discovery) {
return {
Expand All @@ -39,7 +39,7 @@ export const addScans = async ({ request, reply }) => {
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url: property.property_url })
})).json();
console.log(JSON.stringify({ scanResponse }));
// console.log(JSON.stringify({ scanResponse }));

for (const { jobId, url } of scanResponse?.jobs ?? []) {
const urlId = (await db.query({
Expand All @@ -51,10 +51,10 @@ export const addScans = async ({ request, reply }) => {
})).rows?.[0]?.id;

const scan = (await db.query({
text: `INSERT INTO "scans" ("user_id", "property_id", "url_id", "job_id") VALUES ($1, $2, $3, $4) RETURNING "id", "job_id", "property_id", "user_id"`,
text: `INSERT INTO "scans" ("user_id", "property_id", "url_id", "job_id") VALUES ($1, $2, $3, $4) RETURNING "job_id"`,
values: [jwtClaims.sub, propertyId, urlId, parseInt(jobId)]
})).rows[0];
scans.push(scan);
jobIds.push(scan.job_id);
}
}
}
Expand All @@ -67,7 +67,9 @@ export const addScans = async ({ request, reply }) => {
InvocationType: "Event",
Payload: Buffer.from(JSON.stringify({
path: '/internal/processScans',
scans: scans
jobIds: jobIds,
userId: jwtClaims.sub,
propertyId: propertyId,
})),
}));
}
Expand Down
2 changes: 1 addition & 1 deletion src/routes/getScans.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export const getScans = async ({ request, reply }) => {
}
}`,
variables: {
limit: parseInt(request.query.limit ?? 100),
limit: parseInt(request.query.limit ?? 5000),
offset: parseInt(request.query.offset ?? 0),
},
});
Expand Down
7 changes: 7 additions & 0 deletions src/utils/chunk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const chunk = (array, size) => {
const result = [];
for (let i = 0; i < array.length; i += size) {
result.push(array.slice(i, i + size))
}
return result;
}
3 changes: 2 additions & 1 deletion src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ export * from './formatEmail'
export * from './getMode'
export * from './isStaging'
export * from './graphql'
export * from './sleep'
export * from './sleep'
export * from './chunk'

0 comments on commit 940383d

Please sign in to comment.