Skip to content

Commit

Permalink
fix: various tweaks for sync to work in this repo
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Oct 13, 2023
1 parent 5f7ad86 commit 54ef3dd
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 75 deletions.
8 changes: 1 addition & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"dependencies": {
"@babel/eslint-parser": "^7.22.9",
"@chia-carbon/core-registry-config": "^1.0.2",
"@chia-carbon/core-registry-logger": "^1.0.11",
"@chia-carbon/core-registry-logger": "^1.0.12",
"body-parser": "^1.20.2",
"cli-spinner": "^0.2.10",
"cors": "^2.8.5",
Expand Down Expand Up @@ -74,7 +74,6 @@
"eslint-plugin-es": "^4.1.0",
"eslint-plugin-mocha": "^10.2.0",
"husky": "^8.0.3",

"mocha": "^10.2.0",
"semver": "^7.5.4",
"sinon": "^16.1.0",
Expand Down
2 changes: 1 addition & 1 deletion src/datalayer/syncService.js
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ const getRootHistory = (storeId) => {
};

const getRootDiff = (storeId, root1, root2) => {
if (!USE_SIMULATOR) {
if (USE_SIMULATOR) {
return Simulator.getMockedKvDiffFromStagingTable();
} else {
return dataLayer.getRootDiff(storeId, root1, root2);
Expand Down
126 changes: 61 additions & 65 deletions src/tasks/sync-audit-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ import { Organization, Audit, ModelKeys, Staging } from '../models';
import datalayer from '../datalayer';
import { decodeHex } from '../utils/datalayer-utils';
import dotenv from 'dotenv';
import { logger } from '../logger.js';
import { logger } from '../config/logger.cjs';
import { sequelize, sequelizeMirror } from '../database';

import { getConfig } from '../utils/config-loader';
import {
assertDataLayerAvailable,
assertWalletIsSynced,
} from '../utils/data-assertions';

dotenv.config();
import { CONFIG } from '../user-config';

const CONFIG = getConfig().APP;

let taskIsRunning = false;

Expand All @@ -30,7 +31,7 @@ const task = new Task('sync-audit', async () => {
// Log additional information if present in the error object
if (error.response && error.response.body) {
logger.error(
`Additional error details: ${JSON.stringify(error.response.body)}`,
`Additional error details: ${JSON.stringify(error.response.body)}`,
);
}
} finally {
Expand All @@ -39,37 +40,38 @@ const task = new Task('sync-audit', async () => {
});

const job = new SimpleIntervalJob(
{
seconds: CONFIG().CADT.TASKS?.AUDIT_SYNC_TASK_INTERVAL || 30,
runImmediately: true,
},
task,
'sync-audit',
{
seconds: CONFIG?.TASKS?.AUDIT_SYNC_TASK_INTERVAL || 30,
runImmediately: true,
},
task,
'sync-audit',
);

const processJob = async () => {
await assertDataLayerAvailable();
await assertWalletIsSynced();

logger.task('Syncing Audit Information');
logger.info('Syncing Audit Information');
const organizations = await Organization.findAll({
where: { subscribed: true },
raw: true,
});

for (const organization of organizations) {
console.log(`Syncing ${organization.name}`);
await syncOrganizationAudit(organization);
if (!CONFIG().CADT.USE_SIMULATOR) {
if (!CONFIG.USE_SIMULATOR) {
await new Promise((resolve) =>
setTimeout(
resolve,
(CONFIG().CADT.TASKS?.AUDIT_SYNC_TASK_INTERVAL || 30) * 1000,
),
setTimeout(
resolve,
(CONFIG.TASKS?.AUDIT_SYNC_TASK_INTERVAL || 30) * 1000,
),
);
}
}

if (!CONFIG().CADT.USE_SIMULATOR) {
if (!CONFIG.USE_SIMULATOR) {
await new Promise((resolve) => setTimeout(resolve, 5000));
}
};
Expand All @@ -81,13 +83,7 @@ async function createTransaction(callback, afterCommitCallbacks) {
let mirrorTransaction;

try {
// Check if the database is locked and wait until it's unlocked
/* while (await isDatabaseLocked()) {
logger.debug('Database is locked. Waiting...');
await waitFor(retryDelay);
}*/

logger.trace('Starting transaction');
logger.info('Starting transaction');
// Start a transaction
transaction = await sequelize.transaction();
mirrorTransaction = await sequelizeMirror.transaction();
Expand All @@ -103,7 +99,7 @@ async function createTransaction(callback, afterCommitCallbacks) {
await afterCommitCallback();
}

logger.trace('Commited transaction');
logger.info('Commited transaction');

return result;
} catch (error) {
Expand All @@ -118,13 +114,14 @@ async function createTransaction(callback, afterCommitCallbacks) {

const syncOrganizationAudit = async (organization) => {
try {
logger.task(`Syncing Audit: ${_.get(organization, 'name')}`);
logger.info(`Syncing Audit: ${_.get(organization, 'name')}`);
let afterCommitCallbacks = [];
const rootHistory = await datalayer.getRootHistory(organization.registryId);

let lastRootSaved;

if (CONFIG().CADT.USE_SIMULATOR) {
if (CONFIG.USE_SIMULATOR) {
console.log('USING MOCK ROOT HISTORY');
lastRootSaved = rootHistory[0];
lastRootSaved.rootHash = lastRootSaved.root_hash;
} else {
Expand All @@ -146,7 +143,7 @@ const syncOrganizationAudit = async (organization) => {
}

const historyIndex = rootHistory.findIndex(
(root) => root.root_hash === rootHash,
(root) => root.root_hash === rootHash,
);

if (!lastRootSaved) {
Expand All @@ -165,13 +162,13 @@ const syncOrganizationAudit = async (organization) => {
// is reset this will ensure that this organizations regsitry data is
// cleaned up on both the local db and mirror db and ready to resync
await Promise.all(
Object.keys(ModelKeys).map(async (modelKey) => {
ModelKeys[modelKey].destroy({
where: {
orgUid: organization.orgUid,
},
});
}),
Object.keys(ModelKeys).map(async (modelKey) => {
ModelKeys[modelKey].destroy({
where: {
orgUid: organization.orgUid,
},
});
}),
);

return;
Expand All @@ -189,9 +186,9 @@ const syncOrganizationAudit = async (organization) => {
}

const kvDiff = await datalayer.getRootDiff(
organization.registryId,
root1.root_hash,
root2.root_hash,
organization.registryId,
root1.root_hash,
root2.root_hash,
);

if (_.isEmpty(kvDiff)) {
Expand All @@ -200,16 +197,16 @@ const syncOrganizationAudit = async (organization) => {

// 0x636f6d6d656e74 is hex for 'comment'
const comment = kvDiff.filter(
(diff) =>
(diff.key === '636f6d6d656e74' || diff.key === '0x636f6d6d656e74') &&
diff.type === 'INSERT',
(diff) =>
(diff.key === '636f6d6d656e74' || diff.key === '0x636f6d6d656e74') &&
diff.type === 'INSERT',
);

// 0x617574686F72 is hex for 'author'
const author = kvDiff.filter(
(diff) =>
(diff.key === '617574686f72' || diff.key === '0x617574686F72') &&
diff.type === 'INSERT',
(diff) =>
(diff.key === '617574686f72' || diff.key === '0x617574686F72') &&
diff.type === 'INSERT',
);

// Process any deletes in the kv diff first to ensure correct processing order
Expand All @@ -219,7 +216,6 @@ const syncOrganizationAudit = async (organization) => {
});

const homeOrg = await Organization.getHomeOrg();
// console.log(kvDiff);

const updateTransaction = async (transaction, mirrorTransaction) => {
for (const diff of kvDiff) {
Expand All @@ -236,34 +232,34 @@ const syncOrganizationAudit = async (organization) => {
change: decodeHex(diff.value),
onchainConfirmationTimeStamp: root2.timestamp,
comment: _.get(
JSON.parse(decodeHex(_.get(comment, '[0].value', '7b7d'))),
'comment',
'',
JSON.parse(decodeHex(_.get(comment, '[0].value', '7b7d'))),
'comment',
'',
),
author: _.get(
JSON.parse(decodeHex(_.get(author, '[0].value', '7b7d'))),
'author',
'',
JSON.parse(decodeHex(_.get(author, '[0].value', '7b7d'))),
'author',
'',
),
};

if (modelKey) {
const record = JSON.parse(decodeHex(diff.value));
const primaryKeyValue =
record[ModelKeys[modelKey].primaryKeyAttributes[0]];
record[ModelKeys[modelKey].primaryKeyAttributes[0]];

if (diff.type === 'INSERT') {
logger.trace(`INSERTING: ${modelKey} - ${primaryKeyValue}`);
logger.info(`INSERTING: ${modelKey} - ${primaryKeyValue}`);
await ModelKeys[modelKey].upsert(record, {
transaction,
mirrorTransaction,
});
} else if (diff.type === 'DELETE') {
logger.trace(`DELETING: ${modelKey} - ${primaryKeyValue}`);
logger.info(`DELETING: ${modelKey} - ${primaryKeyValue}`);
await ModelKeys[modelKey].destroy({
where: {
[ModelKeys[modelKey].primaryKeyAttributes[0]]:
primaryKeyValue,
primaryKeyValue,
},
transaction,
mirrorTransaction,
Expand All @@ -277,12 +273,12 @@ const syncOrganizationAudit = async (organization) => {
'units',
'projects',
].includes(modelKey)
? primaryKeyValue
: undefined;
? primaryKeyValue
: undefined;

if (stagingUuid) {
afterCommitCallbacks.push(async () => {
logger.trace(`DELETING STAGING: ${stagingUuid}`);
logger.info(`DELETING STAGING: ${stagingUuid}`);
await Staging.destroy({
where: { uuid: stagingUuid },
});
Expand All @@ -294,18 +290,18 @@ const syncOrganizationAudit = async (organization) => {
// Create the Audit record
await Audit.create(auditData, { transaction, mirrorTransaction });
await Organization.update(
{ registryHash: root2.root_hash },
{
where: { orgUid: organization.orgUid },
transaction,
mirrorTransaction,
},
{ registryHash: root2.root_hash },
{
where: { orgUid: organization.orgUid },
transaction,
mirrorTransaction,
},
);
}
}
};

return createTransaction(updateTransaction, afterCommitCallbacks);
return await createTransaction(updateTransaction, afterCommitCallbacks);
} catch (error) {
logger.error('Error syncing org audit', error);
}
Expand Down
1 change: 1 addition & 0 deletions src/utils/config-loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export const getConfig = _.memoize(() => {
if (process.env.USE_SIMULATOR) {
defaultConfig.APP.USE_SIMULATOR = true;
defaultConfig.APP.CHIA_NETWORK = 'testnet';
defaultConfig.APP.TASKS.AUDIT_SYNC_TASK_INTERVAL = 30;
logger.info(`ENV FILE OVERRIDE: RUNNING IN SIMULATOR MODE`);
}

Expand Down
1 change: 1 addition & 0 deletions tests/integration/project.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ describe('Project Resource Integration Tests', function () {
// Now push the staging table live
await testFixtures.commitStagingRecords();
await testFixtures.waitForDataLayerSync();
await testFixtures.waitForDataLayerSync();

// The staging table should be empty after committing
expect(await testFixtures.getLastCreatedStagingRecord()).to.equal(
Expand Down
1 change: 1 addition & 0 deletions tests/test-fixtures/staging-fixtures.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const resetStagingTable = async () => {

export const getLastCreatedStagingRecord = async () => {
const result = await supertest(app).get('/v1/staging');
console.log('!!!!!!!!!!!!!!!!!!', result.body);
expect(result.body).to.be.an('array');
return _.last(result.body);
};
Expand Down

0 comments on commit 54ef3dd

Please sign in to comment.