Skip to content

Commit

Permalink
v1 of scan process refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
azdak committed Nov 21, 2024
1 parent abb0b57 commit 41f8ae5
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 14 deletions.
133 changes: 133 additions & 0 deletions src/internal/ingestScanData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import { hashStringToUuid } from '#src/utils';
/* ingests data from a scan job and adds it to the equalify database */
export const ingestScanData = async ( db, result, jobId, urlId, userId ) => {

// Find existing IDs for urls, messages, tags, & nodes (or create them)
if (result.nodes.length > 0) {

// **** urls // Removed - all URLs should already exist in the URL table
/* for (const row of result.urls) {
row.id =
(await db.query({
text: `SELECT "id" FROM "urls" WHERE "user_id"=$1 AND "url"=$2`,
values: [userId, row.url],
})).rows?.[0]?.id
??
(await db.query({
text: `INSERT INTO "urls" ("user_id", "url", "property_id") VALUES ($1, $2, $3) RETURNING "id"`,
values: [userId, row.url, propertyId]
})).rows?.[0]?.id;
} */

// **** enodes
for (const row of result.nodes) {

// see if this node matches an existing enode
const existingId = (await db.query({
text: `SELECT "id" FROM "enodes" WHERE "user_id"=$1 AND "html"=$2 AND "targets"=$3 AND "url_id"=$4`,
values: [userId, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id],
})).rows?.[0]?.id;

row.id = existingId ??
// if not, create the enode
(await db.query({
text: `INSERT INTO "enodes" ("user_id", "html", "targets", "url_id", "equalified") VALUES ($1, $2, $3, $4, $5) RETURNING "id"`,
values: [userId, row.html, JSON.stringify(row.targets), result.urls.find(obj => obj.urlId === row.relatedUrlId)?.id, false],
})).rows?.[0]?.id;

// if row existed, set to not equalified
if (existingId) {
await db.query({
text: `UPDATE "enodes" SET "equalified"=$1 WHERE "id"=$2`,
values: [false, row.id],
});
}

// **** enode_updates
// see if this node matches an existing enode_update
const existingNodeUpdateId = (await db.query({
text: `SELECT "id" FROM "enode_updates" WHERE "user_id"=$1 AND "enode_id"=$2 AND "created_at"::text LIKE $3`,
values: [userId, row.id, `${new Date().toISOString().split('T')[0]}%`],
})).rows[0]?.id;
// if enode_update existed, set to not equalified
if (existingNodeUpdateId) {
await db.query({
text: `UPDATE "enode_updates" SET "equalified"=$1 WHERE "id"=$2`,
values: [false, existingNodeUpdateId],
});
} else {
//otherwise create the enode_update
await db.query({
text: `INSERT INTO "enode_updates" ("user_id", "enode_id", "equalified") VALUES ($1, $2, $3)`,
values: [userId, row.id, false],
});
}

}
// **** tags
for (const row of result.tags) {
const tagId = hashStringToUuid(row.tag);
row.id =
(await db.query({
text: `SELECT "id" FROM "tags" WHERE "id"=$1`,
values: [tagId],
})).rows?.[0]?.id
??
(await db.query({
text: `INSERT INTO "tags" ("id", "tag") VALUES ($1, $2) RETURNING "id"`,
values: [tagId, row.tag],
})).rows?.[0]?.id;
}
// **** messages
for (const row of result.messages) {
const messageId = hashStringToUuid(row.message);
const existingMessageId = (await db.query({
text: `SELECT "id" FROM "messages" WHERE "id"=$1`,
values: [messageId],
})).rows?.[0]?.id;
row.id = existingMessageId ??
(await db.query({
text: `INSERT INTO "messages" ("id", "message", "type") VALUES ($1, $2, $3) RETURNING "id"`,
values: [messageId, row.message, row.type],
})).rows?.[0]?.id;

for (const relatedNodeId of row.relatedNodeIds) {
try {
const messsageNodeExists = (await db.query({
text: `SELECT "id" FROM "message_nodes" WHERE "user_id"=$1 AND "message_id"=$2 AND "enode_id"=$3`,
values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id],
})).rows?.[0]?.id;
if (!messsageNodeExists) {
await db.query({
text: `INSERT INTO "message_nodes" ("user_id", "message_id", "enode_id") VALUES ($1, $2, $3)`,
values: [userId, row.id, result.nodes.find(obj => obj.nodeId === relatedNodeId)?.id]
})
}
}
catch (err) {
console.log(err, `messageNode error`, JSON.stringify({ row }));
}
}

if (!existingMessageId) {
for (const relatedTagId of row.relatedTagIds) {
try {
await db.query({
text: `INSERT INTO "message_tags" ("message_id", "tag_id") VALUES ($1, $2)`,
values: [messageId, result.tags.find(obj => obj.tagId === relatedTagId)?.id]
});
}
catch (err) {
console.log(err, `messageTag error`, JSON.stringify({ row }));
}
}
}
}
}
await db.query({
text: `UPDATE "scans" SET "processing"=FALSE, "results"=$1 WHERE "job_id"=$2`,
values: [result, jobId],
});

return result.nodes;
}
28 changes: 14 additions & 14 deletions src/internal/pollOutstandingScans.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,29 @@ import { ingestScanData } from './ingestScanData';
export const pollOutstandingScans = async ({ request, reply }) => {
console.log(`START POLLING OPEN SCANS`);
const startTime = new Date().getTime();
const jobIds = (await db.query({
text: `SELECT job_id, url_id FROM scans WHERE processing = TRUE`
}));
return jobIds;
const jobs = (await db.query({
text: `SELECT job_id, url_id, user_id FROM scans WHERE processing = TRUE`
})).rows;

// make sure we sort the open jobids ASC
const sortedJobIds = jobIds.jobIds.sort((a, b) => (a - b));
return sortedJobIds;

for(const jobId of sortedJobIds){
const sortedJobIds = jobs.sort((a, b) => (a.job_id - b.job_id));
const processedJobs = [];
for(const job of sortedJobIds){
// check scan for result
try{
const scanResults = await fetch(`https://scan${isStaging ? '-dev' : ''}.equalify.app/results/${jobId}`, { signal: AbortSignal.timeout(10000) });
const scanResults = await fetch(`https://scan${isStaging ? '-dev' : ''}.equalify.app/results/${job.job_id}`, { signal: AbortSignal.timeout(10000) });
const { result, status } = await scanResults.json();

// jobIDs are processed in ascending order, so we can stop checking here
if (['delayed', 'active', 'waiting'].includes(status)) {
break;
break; // jobIDs are processed in ascending order, so we can stop checking here
}
else if (['failed', 'unknown'].includes(status)) {
await db.query(`DELETE FROM "scans" WHERE "job_id"=$1`, [jobId]);
await db.query(`DELETE FROM "scans" WHERE "job_id"=$1`, [job.job_id]);
}
else if (['completed'].includes(status)) {
const nodeIds = await ingestScanData({ result, jobId });
const nodeIds = await ingestScanData( db, result, job.job_id, job.url_id, job.user_id );
processedJobs.push(nodeIds);
}

} catch (err) {
Expand All @@ -38,6 +37,7 @@ export const pollOutstandingScans = async ({ request, reply }) => {
const deltaTime = new Date().getTime() - startTime;
console.log(`END PROCESS SCANS, took ${deltaTime}`);
return {
sortedJobIds
processedJobs: processedJobs,
perf: `${processedJobs.length} jobs processed, took ${deltaTime}ms`
};
}

0 comments on commit 41f8ae5

Please sign in to comment.