Skip to content

Commit

Permalink
Merge pull request #10 from EqualifyEverything/staging
Browse files Browse the repository at this point in the history
Removed userId association from messages/tags, using dev scan on staging
  • Loading branch information
bbertucc committed Sep 23, 2024
2 parents 6cc56cf + 1632ab2 commit aa8ccbe
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 31 deletions.
5 changes: 4 additions & 1 deletion src/internal.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { processScans } from '#src/internal/index';
import { migrateMessagesAndTags, processScans } from '#src/internal/index';

export const internal = async (event) => {
if (event.path.endsWith('/processScans')) {
return processScans(event);
}
else if (event.path.endsWith('/migrateMessagesAndTags')) {
return migrateMessagesAndTags(event);
}
}
3 changes: 2 additions & 1 deletion src/internal/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './processScans'
export * from './processScans'
export * from './migrateMessagesAndTags'
33 changes: 33 additions & 0 deletions src/internal/migrateMessagesAndTags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { db, hashStringToUuid } from '#src/utils';

export const migrateMessagesAndTags = async () => {
await db.connect();
// Make sure we have distinct message/tag rows
const distinctMessages = (await db.query(`SELECT DISTINCT "message", "type" FROM "messages"`)).rows;
for (const row of distinctMessages) {
const messageId = hashStringToUuid(row.message);
try {
await db.query({
text: `INSERT INTO "messages" ("id","message","type") VALUES ($1, $2, $3)`,
values: [messageId, row.message, row.type]
});
}
catch (err) { }
}
const distinctTags = (await db.query(`SELECT DISTINCT "tag" FROM "tags"`)).rows;
for (const row of distinctTags) {
const tagId = hashStringToUuid(row.tag);
try {
await db.query({
text: `INSERT INTO "tags" ("id","tag") VALUES ($1, $2)`,
values: [tagId, row.tag],
});
}
catch (err) { }
}

// Now, find all old messages and adjust.

await db.clean();
return;
}
49 changes: 24 additions & 25 deletions src/internal/processScans.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { chunk, db, sleep } from '#src/utils';
import { chunk, db, isStaging, sleep, hashStringToUuid } from '#src/utils';

export const processScans = async (event) => {
console.log(`START PROCESS SCANS`);
Expand All @@ -18,7 +18,7 @@ export const processScans = async (event) => {
console.log(`Start ${index} of ${batchesOfJobIds.length} batches`);
await Promise.allSettled(batchOfJobIds.map(jobId => new Promise(async (res) => {
try {
const scanResults = await fetch(`https://scan.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(10000) });
const scanResults = await fetch(`https://scan${isStaging ? '-dev' : ''}.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(10000) });
const { result, status } = await scanResults.json();
if (['delayed', 'active', 'waiting'].includes(status)) {
remainingScans.push(jobId);
Expand Down Expand Up @@ -156,25 +156,28 @@ const scanProcessor = async ({ result, jobId, userId, propertyId }) => {
}
}
for (const row of result.tags) {
const tagId = hashStringToUuid(row.tag);
row.id =
(await db.query({
text: `SELECT "id" FROM "tags" WHERE "user_id"=$1 AND "tag"=$2`,
values: [userId, row.tag],
text: `SELECT "id" FROM "tags" WHERE "id"=$1`,
values: [tagId],
})).rows?.[0]?.id
??
(await db.query({
text: `INSERT INTO "tags" ("user_id", "tag") VALUES ($1, $2) RETURNING "id"`,
values: [userId, row.tag],
text: `INSERT INTO "tags" ("id", "tag") VALUES ($1, $2) RETURNING "id"`,
values: [tagId, row.tag],
})).rows?.[0]?.id;
}
for (const row of result.messages) {
row.id = (await db.query({
text: `SELECT "id" FROM "messages" WHERE "user_id"=$1 AND "message"=$2 AND "type"=$3`,
values: [userId, row.message, row.type],
})).rows?.[0]?.id ??
const messageId = hashStringToUuid(row.message);
const existingMessageId = (await db.query({
text: `SELECT "id" FROM "messages" WHERE "id"=$1`,
values: [messageId],
})).rows?.[0]?.id;
row.id = existingMessageId ??
(await db.query({
text: `INSERT INTO "messages" ("user_id", "message", "type") VALUES ($1, $2, $3) RETURNING "id"`,
values: [userId, row.message, row.type],
text: `INSERT INTO "messages" ("id", "message", "type") VALUES ($1, $2, $3) RETURNING "id"`,
values: [messageId, row.message, row.type],
})).rows?.[0]?.id;

for (const relatedNodeId of row.relatedNodeIds) {
Expand All @@ -195,21 +198,17 @@ const scanProcessor = async ({ result, jobId, userId, propertyId }) => {
}
}

for (const relatedTagId of row.relatedTagIds) {
try {
const messageTagExists = (await db.query({
text: `SELECT "id" FROM "message_tags" WHERE "user_id"=$1 AND "message_id"=$2 AND "tag_id"=$3`,
values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedTagId)?.id],
})).rows?.[0]?.id;
if (!messageTagExists) {
if (!existingMessageId) {
for (const relatedTagId of row.relatedTagIds) {
try {
await db.query({
text: `INSERT INTO "message_tags" ("user_id", "message_id", "tag_id") VALUES ($1, $2, $3)`,
values: [userId, row.id, result.tags.find(obj => obj.tagId === relatedTagId)?.id]
})
text: `INSERT INTO "message_tags" ("message_id", "tag_id") VALUES ($1, $2)`,
values: [messageId, result.tags.find(obj => obj.tagId === relatedTagId)?.id]
});
}
catch (err) {
console.log(err, `messageTag error`, JSON.stringify({ row }));
}
}
catch (err) {
console.log(err, `messageTag error`, JSON.stringify({ row }));
}
}
}
Expand Down
23 changes: 20 additions & 3 deletions src/routes/getResultsAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@ export const getResultsAll = async ({ request, reply }) => {
Messages, Tags, Properties, Pages are sorted by properties related to the most nodes w/ nodeEqualified set to false (most to least)
*/
await db.connect();
const report = (await db.query(`SELECT "id", "name", "filters" FROM "reports" WHERE "id" = $1`, [request.query.reportId])).rows[0];
const report = (await db.query(`SELECT "id", "name", "filters", "cache_date" FROM "reports" WHERE "id" = $1`, [request.query.reportId])).rows[0];
const currentDate = new Date().getTime();
const cacheDate = new Date(report.cache_date).getTime();
if (currentDate < cacheDate) {
const compressedBody = (await db.query({
text: `SELECT "cache_gzip" FROM "reports" WHERE "id"=$1`,
values: [request.query.reportId],
})).rows[0].cache_gzip;
reply.headers({ 'content-encoding': 'gzip' });
return reply.send(compressedBody);
}

const types = ['properties', 'urls', 'messages', 'nodes', 'tags', 'types', 'status'];
const filters = Object.fromEntries(types.map(obj => [obj, []]));
for (const type of types) {
Expand All @@ -19,7 +30,6 @@ export const getResultsAll = async ({ request, reply }) => {
text: `SELECT "id", "url" FROM "urls" WHERE "id" = ANY($1::uuid[]) OR "property_id" = ANY($2::uuid[])`,
values: [filters.urls, filters.properties],
})).rows;
await db.clean();

const response = await graphql({
request,
Expand Down Expand Up @@ -161,6 +171,13 @@ export const getResultsAll = async ({ request, reply }) => {
.sort((a, b) => a.date > b.date ? -1 : 1),
};
const compressedBody = gzipSync(JSON.stringify(body));
const cacheExpiry = new Date();
cacheExpiry.setMinutes(cacheExpiry.getMinutes() + 5);
await db.query({
text: `UPDATE "reports" SET "cache_gzip"=$1, "cache_date"=$2 WHERE "id"=$3`,
values: [compressedBody, cacheExpiry, request.query.reportId],
});
await db.clean();
reply.headers({ 'content-encoding': 'gzip' });
return reply.send(compressedBody)
return reply.send(compressedBody);
}
5 changes: 5 additions & 0 deletions src/utils/hashStringToUuid.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { createHash } from 'crypto';

export const hashStringToUuid = (input) => {
return createHash('md5').update(input).digest('hex');
}
3 changes: 2 additions & 1 deletion src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ export * from './getMode'
export * from './isStaging'
export * from './graphql'
export * from './sleep'
export * from './chunk'
export * from './chunk'
export * from './hashStringToUuid'

0 comments on commit aa8ccbe

Please sign in to comment.